Changes made as per code review.

This commit is contained in:
Sahil Palvia 2017-08-01 10:54:49 -07:00
parent 412eb2f846
commit 0a3ea9016f

View file

@ -163,40 +163,36 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded.
*/
boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
StatusMessage statusMessage = null;
while (statusMessage == null) {
Optional<StatusMessage> statusMessage = Optional.empty();
while (!statusMessage.isPresent()) {
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
Optional<Message> message = configuration.getTimeoutInSeconds().map(second ->
futureMethod(() -> future.get(second, TimeUnit.SECONDS), action)).orElse(futureMethod(future::get, action));
Optional<Message> message = configuration.getTimeoutInSeconds()
.map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
.orElse(futureMethod(future::get, action));
if (!message.isPresent()) {
return false;
}
Optional<Boolean> booleanStatusMessage = message.flatMap(m -> {
if (m instanceof CheckpointMessage) {
return Optional.of(futureMethod(() -> checkpoint((CheckpointMessage) m, checkpointer).get()));
}
return Optional.empty();
});
Optional<Boolean> checkpointFailed = message.filter(m -> m instanceof CheckpointMessage )
.map(m -> (CheckpointMessage) m)
.flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
.map(checkpointSuccess -> !checkpointSuccess);
Message m = message.get();
if (booleanStatusMessage.isPresent() && !booleanStatusMessage.get()) {
if (checkpointFailed.orElse(false)) {
return false;
} else if (!booleanStatusMessage.isPresent() && m instanceof StatusMessage) {
statusMessage = (StatusMessage) m;
}
// Note that instanceof doubles as a check against a value being null
statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m );
}
return this.validateStatusMessage(statusMessage, action);
return this.validateStatusMessage(statusMessage.get(), action);
}
private interface FutureMethod {
Message get() throws InterruptedException, TimeoutException, ExecutionException;
private interface FutureMethod<T> {
T get() throws InterruptedException, TimeoutException, ExecutionException;
}
private Optional<Message> futureMethod(FutureMethod fm, String action) {
private <T> Optional<T> futureMethod(FutureMethod<T> fm, String action) {
try {
return Optional.of(fm.get());
} catch (InterruptedException e) {
@ -215,23 +211,6 @@ class MultiLangProtocol {
return Optional.empty();
}
private interface CheckpointFutureMethod {
Boolean get() throws InterruptedException, ExecutionException;
}
private Boolean futureMethod(CheckpointFutureMethod cfm) {
try {
return cfm.get();
} catch (InterruptedException e) {
log.error(String.format("Interrupted while waiting for Checkpointing message for shard %s",
initializationInput.getShardId()), e);
} catch (ExecutionException e) {
log.error(String.format("Failed to get status message for Checkpointing action for shard %s",
initializationInput.getShardId()), e);
}
return false;
}
/**
* This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM
* without calling the Shutdown hooks.