diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index dfac215f..0376242a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -163,40 +163,36 @@ class MultiLangProtocol { * @return Whether or not this operation succeeded. */ boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { - StatusMessage statusMessage = null; - while (statusMessage == null) { + Optional statusMessage = Optional.empty(); + while (!statusMessage.isPresent()) { Future future = this.messageReader.getNextMessageFromSTDOUT(); - Optional message = configuration.getTimeoutInSeconds().map(second -> - futureMethod(() -> future.get(second, TimeUnit.SECONDS), action)).orElse(futureMethod(future::get, action)); + Optional message = configuration.getTimeoutInSeconds() + .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action)) + .orElse(futureMethod(future::get, action)); if (!message.isPresent()) { return false; } - Optional booleanStatusMessage = message.flatMap(m -> { - if (m instanceof CheckpointMessage) { - return Optional.of(futureMethod(() -> checkpoint((CheckpointMessage) m, checkpointer).get())); - } - return Optional.empty(); - }); + Optional checkpointFailed = message.filter(m -> m instanceof CheckpointMessage ) + .map(m -> (CheckpointMessage) m) + .flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint")) + .map(checkpointSuccess -> !checkpointSuccess); - Message m = message.get(); - - if (booleanStatusMessage.isPresent() && !booleanStatusMessage.get()) { + if (checkpointFailed.orElse(false)) { return false; - } else if (!booleanStatusMessage.isPresent() && m instanceof StatusMessage) { - statusMessage = (StatusMessage) m; } - // Note that instanceof doubles as a check against a value being null + + statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m ); } - return this.validateStatusMessage(statusMessage, action); + return this.validateStatusMessage(statusMessage.get(), action); } - private interface FutureMethod { - Message get() throws InterruptedException, TimeoutException, ExecutionException; + private interface FutureMethod { + T get() throws InterruptedException, TimeoutException, ExecutionException; } - private Optional futureMethod(FutureMethod fm, String action) { + private Optional futureMethod(FutureMethod fm, String action) { try { return Optional.of(fm.get()); } catch (InterruptedException e) { @@ -215,23 +211,6 @@ class MultiLangProtocol { return Optional.empty(); } - private interface CheckpointFutureMethod { - Boolean get() throws InterruptedException, ExecutionException; - } - - private Boolean futureMethod(CheckpointFutureMethod cfm) { - try { - return cfm.get(); - } catch (InterruptedException e) { - log.error(String.format("Interrupted while waiting for Checkpointing message for shard %s", - initializationInput.getShardId()), e); - } catch (ExecutionException e) { - log.error(String.format("Failed to get status message for Checkpointing action for shard %s", - initializationInput.getShardId()), e); - } - return false; - } - /** * This method is used to halt the JVM. Use this method with utmost caution, since this method will kill the JVM * without calling the Shutdown hooks.