From 66cbcba33a685e7dad2d2910b39cfcaa18393467 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 10:24:52 -0700 Subject: [PATCH] Shutdown requested via dispatch. --- .../kinesis/multilang/MessageWriter.java | 14 ++++++++++++-- .../kinesis/multilang/MultiLangDaemon.java | 10 ++++++---- .../kinesis/multilang/MultiLangProtocol.java | 13 +++++++++++++ .../multilang/MultiLangRecordProcessor.java | 7 +++++-- .../messages/ShutdownRequestedMessage.java | 17 +++++++++++++++++ 5 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index b2ddbfe3..3310d248 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,13 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; + +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -145,6 +148,13 @@ class MessageWriter { return writeMessage(new ShutdownMessage(reason)); } + /** + * Writes a {@link ShutdownRequestedMessage} to the subprocess. + */ + Future writeShutdownRequestedMessage() { + return writeMessage(new ShutdownRequestedMessage()); + } + /** * Writes a {@link CheckpointMessage} to the subprocess. * diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 185cc070..2d90680f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -16,10 +16,13 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.PrintStream; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,17 +148,16 @@ public class MultiLangDaemon implements Callable { config.getRecordProcessorFactory(), executorService); - Runtime.getRuntime().addShutdownHook(new Thread() - { + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); - fut.get(); + fut.get(5000, TimeUnit.MILLISECONDS); LOG.info("Process shutdown is complete."); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Encountered an error during shutdown.", e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 64c7829f..67a97770 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import lombok.extern.apachecommons.CommonsLog; @@ -99,6 +100,18 @@ class MultiLangProtocol { return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); } + /** + * Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a + * {@link StatusMessage} on its STDOUT. + * + * @param checkpointer A checkpointer. + * @return Whether or not this operation succeeded. + */ + boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + Future writeFuture = messageWriter.writeShutdownRequestedMessage(); + return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture); + } + /** * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 76db52c0..e23508d1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -150,11 +150,14 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti } try { LOG.info("Requesting a checkpoint on shutdown notification."); +// ProcessRecordsInput emptyInput = new ProcessRecordsInput(); + + checkpointer.checkpoint(); } catch (InvalidStateException e) { - LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e); } catch (ShutdownException e) { - LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java new file mode 100644 index 00000000..d6b52e16 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -0,0 +1,17 @@ +package com.amazonaws.services.kinesis.multilang.messages; + +/** + * A message to indicate to the client's process that shutdown is requested. + */ +public class ShutdownRequestedMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "shutdownrequested"; + + /** + * Convenience constructor. + */ + public ShutdownRequestedMessage() { + } +}