From 19f33268231193fb5a29d6f3deb73842d90af149 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Wed, 31 May 2017 23:42:13 -0700 Subject: [PATCH 1/2] Clean shutdown when multilang deamen is terminated by giving one last chance to checkpoint. --- .../kinesis/multilang/MultiLangDaemon.java | 18 ++++++++++++- .../multilang/MultiLangRecordProcessor.java | 25 ++++++++++++++++++- .../StreamingRecordProcessorTest.java | 1 + 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index fdff4dc7..185cc070 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -140,11 +140,27 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( + final MultiLangDaemon daemon = new MultiLangDaemon( config.getKinesisClientLibConfiguration(), config.getRecordProcessorFactory(), executorService); + Runtime.getRuntime().addShutdownHook(new Thread() + { + @Override + public void run() + { + LOG.info("Process terminanted, will initiate shutdown."); + try { + Future fut = daemon.worker.requestShutdown(); + fut.get(); + LOG.info("Process shutdown is complete."); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Encountered an error during shutdown.", e); + } + } + }); + Future future = executorService.submit(daemon); try { System.exit(future.get()); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 9d76af54..76db52c0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -20,6 +20,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,13 +33,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.fasterxml.jackson.databind.ObjectMapper; + /** * A record processor that manages creating a child process that implements the multi language protocol and connecting * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on * that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are * called. */ -public class MultiLangRecordProcessor implements IRecordProcessor { +public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class); private static final int EXIT_VALUE = 1; @@ -136,6 +141,24 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } } + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + LOG.info("Shutdown is requested."); + if (!initialized) { + LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); + return; + } + try { + LOG.info("Requesting a checkpoint on shutdown notification."); + checkpointer.checkpoint(); + } catch (InvalidStateException e) { + LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); + } catch (ShutdownException e) { + LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e.toString()); + } + } + + /** * Used to tell whether the processor has been shutdown already. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 2c02b5e9..b645af9b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -172,6 +172,7 @@ public class StreamingRecordProcessorTest { recordProcessor.initialize(new InitializationInput().withShardId(shardId)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); + recordProcessor.shutdownRequested(Mockito.mock(IRecordProcessorCheckpointer.class)); recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE)); } From d4e2a4b91fdfd6b4e0306d5e534c9822985c7049 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 10:24:52 -0700 Subject: [PATCH 2/2] Shutdown requested via dispatch. --- .../kinesis/multilang/MessageWriter.java | 14 ++++++++++++-- .../kinesis/multilang/MultiLangDaemon.java | 8 ++++---- .../kinesis/multilang/MultiLangProtocol.java | 13 +++++++++++++ .../multilang/MultiLangRecordProcessor.java | 7 +++++-- .../messages/ShutdownRequestedMessage.java | 17 +++++++++++++++++ 5 files changed, 51 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index b2ddbfe3..3310d248 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,13 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; + +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -145,6 +148,13 @@ class MessageWriter { return writeMessage(new ShutdownMessage(reason)); } + /** + * Writes a {@link ShutdownRequestedMessage} to the subprocess. + */ + Future writeShutdownRequestedMessage() { + return writeMessage(new ShutdownRequestedMessage()); + } + /** * Writes a {@link CheckpointMessage} to the subprocess. * diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 185cc070..cee7e404 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.PrintStream; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -145,17 +146,16 @@ public class MultiLangDaemon implements Callable { config.getRecordProcessorFactory(), executorService); - Runtime.getRuntime().addShutdownHook(new Thread() - { + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); - fut.get(); + fut.get(5000, TimeUnit.MILLISECONDS); LOG.info("Process shutdown is complete."); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Encountered an error during shutdown.", e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 64c7829f..67a97770 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import lombok.extern.apachecommons.CommonsLog; @@ -99,6 +100,18 @@ class MultiLangProtocol { return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); } + /** + * Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a + * {@link StatusMessage} on its STDOUT. + * + * @param checkpointer A checkpointer. + * @return Whether or not this operation succeeded. + */ + boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + Future writeFuture = messageWriter.writeShutdownRequestedMessage(); + return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture); + } + /** * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 76db52c0..e23508d1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -150,11 +150,14 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti } try { LOG.info("Requesting a checkpoint on shutdown notification."); +// ProcessRecordsInput emptyInput = new ProcessRecordsInput(); + + checkpointer.checkpoint(); } catch (InvalidStateException e) { - LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e); } catch (ShutdownException e) { - LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java new file mode 100644 index 00000000..d6b52e16 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -0,0 +1,17 @@ +package com.amazonaws.services.kinesis.multilang.messages; + +/** + * A message to indicate to the client's process that shutdown is requested. + */ +public class ShutdownRequestedMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "shutdownrequested"; + + /** + * Convenience constructor. + */ + public ShutdownRequestedMessage() { + } +}