From 19f33268231193fb5a29d6f3deb73842d90af149 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Wed, 31 May 2017 23:42:13 -0700 Subject: [PATCH 1/6] Clean shutdown when multilang deamen is terminated by giving one last chance to checkpoint. --- .../kinesis/multilang/MultiLangDaemon.java | 18 ++++++++++++- .../multilang/MultiLangRecordProcessor.java | 25 ++++++++++++++++++- .../StreamingRecordProcessorTest.java | 1 + 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index fdff4dc7..185cc070 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -140,11 +140,27 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( + final MultiLangDaemon daemon = new MultiLangDaemon( config.getKinesisClientLibConfiguration(), config.getRecordProcessorFactory(), executorService); + Runtime.getRuntime().addShutdownHook(new Thread() + { + @Override + public void run() + { + LOG.info("Process terminanted, will initiate shutdown."); + try { + Future fut = daemon.worker.requestShutdown(); + fut.get(); + LOG.info("Process shutdown is complete."); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Encountered an error during shutdown.", e); + } + } + }); + Future future = executorService.submit(daemon); try { System.exit(future.get()); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 9d76af54..76db52c0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -20,6 +20,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,13 +33,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.fasterxml.jackson.databind.ObjectMapper; + /** * A record processor that manages creating a child process that implements the multi language protocol and connecting * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on * that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are * called. */ -public class MultiLangRecordProcessor implements IRecordProcessor { +public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class); private static final int EXIT_VALUE = 1; @@ -136,6 +141,24 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } } + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + LOG.info("Shutdown is requested."); + if (!initialized) { + LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); + return; + } + try { + LOG.info("Requesting a checkpoint on shutdown notification."); + checkpointer.checkpoint(); + } catch (InvalidStateException e) { + LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); + } catch (ShutdownException e) { + LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e.toString()); + } + } + + /** * Used to tell whether the processor has been shutdown already. */ diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 2c02b5e9..b645af9b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -172,6 +172,7 @@ public class StreamingRecordProcessorTest { recordProcessor.initialize(new InitializationInput().withShardId(shardId)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); + recordProcessor.shutdownRequested(Mockito.mock(IRecordProcessorCheckpointer.class)); recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE)); } From 66cbcba33a685e7dad2d2910b39cfcaa18393467 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 10:24:52 -0700 Subject: [PATCH 2/6] Shutdown requested via dispatch. --- .../kinesis/multilang/MessageWriter.java | 14 ++++++++++++-- .../kinesis/multilang/MultiLangDaemon.java | 10 ++++++---- .../kinesis/multilang/MultiLangProtocol.java | 13 +++++++++++++ .../multilang/MultiLangRecordProcessor.java | 7 +++++-- .../messages/ShutdownRequestedMessage.java | 17 +++++++++++++++++ 5 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index b2ddbfe3..3310d248 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,13 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; + +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -145,6 +148,13 @@ class MessageWriter { return writeMessage(new ShutdownMessage(reason)); } + /** + * Writes a {@link ShutdownRequestedMessage} to the subprocess. + */ + Future writeShutdownRequestedMessage() { + return writeMessage(new ShutdownRequestedMessage()); + } + /** * Writes a {@link CheckpointMessage} to the subprocess. * diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 185cc070..2d90680f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -16,10 +16,13 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.PrintStream; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,17 +148,16 @@ public class MultiLangDaemon implements Callable { config.getRecordProcessorFactory(), executorService); - Runtime.getRuntime().addShutdownHook(new Thread() - { + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); - fut.get(); + fut.get(5000, TimeUnit.MILLISECONDS); LOG.info("Process shutdown is complete."); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Encountered an error during shutdown.", e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 64c7829f..67a97770 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import lombok.extern.apachecommons.CommonsLog; @@ -99,6 +100,18 @@ class MultiLangProtocol { return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); } + /** + * Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a + * {@link StatusMessage} on its STDOUT. + * + * @param checkpointer A checkpointer. + * @return Whether or not this operation succeeded. + */ + boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + Future writeFuture = messageWriter.writeShutdownRequestedMessage(); + return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture); + } + /** * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 76db52c0..e23508d1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -150,11 +150,14 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti } try { LOG.info("Requesting a checkpoint on shutdown notification."); +// ProcessRecordsInput emptyInput = new ProcessRecordsInput(); + + checkpointer.checkpoint(); } catch (InvalidStateException e) { - LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e); } catch (ShutdownException e) { - LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e.toString()); + LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java new file mode 100644 index 00000000..d6b52e16 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -0,0 +1,17 @@ +package com.amazonaws.services.kinesis.multilang.messages; + +/** + * A message to indicate to the client's process that shutdown is requested. + */ +public class ShutdownRequestedMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "shutdownrequested"; + + /** + * Convenience constructor. + */ + public ShutdownRequestedMessage() { + } +} From 7be1059b9ab3998d8fa7165e669a1aef830041e7 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 12:56:00 -0700 Subject: [PATCH 3/6] minor formatting fix. --- .../amazonaws/services/kinesis/multilang/MultiLangDaemon.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 2d90680f..885eeb4f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -150,8 +150,7 @@ public class MultiLangDaemon implements Callable { Runtime.getRuntime().addShutdownHook(new Thread() { @Override - public void run() - { + public void run() { LOG.info("Process terminanted, will initiate shutdown."); try { Future fut = daemon.worker.requestShutdown(); From b5946cf19b8b95c39afe30c56f52aa82e819c0ed Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 13:10:10 -0700 Subject: [PATCH 4/6] Shutdown requested from plumbing in multi lang record processor. --- .../kinesis/multilang/MultiLangRecordProcessor.java | 13 +++---------- .../multilang/StreamingRecordProcessorTest.java | 1 - 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index e23508d1..dd141fa3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -148,16 +148,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); return; } - try { - LOG.info("Requesting a checkpoint on shutdown notification."); -// ProcessRecordsInput emptyInput = new ProcessRecordsInput(); - - - checkpointer.checkpoint(); - } catch (InvalidStateException e) { - LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e); - } catch (ShutdownException e) { - LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e); + LOG.info("Requesting a checkpoint on shutdown notification."); + if (!protocol.shutdownRequested(checkpointer)) { + LOG.error("Child process failed to shutdown"); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index b645af9b..2c02b5e9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -172,7 +172,6 @@ public class StreamingRecordProcessorTest { recordProcessor.initialize(new InitializationInput().withShardId(shardId)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); - recordProcessor.shutdownRequested(Mockito.mock(IRecordProcessorCheckpointer.class)); recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE)); } From 7d014da700c7f31633759a8df68c54e381cd77a1 Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 13:15:45 -0700 Subject: [PATCH 5/6] Fix log message. --- .../services/kinesis/multilang/MultiLangRecordProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index dd141fa3..2532771b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -150,7 +150,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti } LOG.info("Requesting a checkpoint on shutdown notification."); if (!protocol.shutdownRequested(checkpointer)) { - LOG.error("Child process failed to shutdown"); + LOG.error("Child process failed to complete shutdown notification."); } } From 0ed91f953a508d67c3071b1e58e42aa25bb91b8f Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Sat, 10 Jun 2017 23:23:32 -0700 Subject: [PATCH 6/6] adding some unit tests. --- .../multilang/messages/ShutdownRequestedMessage.java | 2 +- .../services/kinesis/multilang/MessageWriterTest.java | 10 ++++++++++ .../kinesis/multilang/MultiLangProtocolTest.java | 10 ++++++++++ .../kinesis/multilang/messages/MessageTest.java | 2 +- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java index d6b52e16..6cf77964 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -7,7 +7,7 @@ public class ShutdownRequestedMessage extends Message { /** * The name used for the action field in {@link Message}. */ - public static final String ACTION = "shutdownrequested"; + public static final String ACTION = "shutdownRequested"; /** * Convenience constructor. diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 08f04c92..21771980 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -113,6 +113,16 @@ public class MessageWriterTest { Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); } + @Test + public void writeShutdownRequestedMessageTest() throws IOException, InterruptedException, ExecutionException { + Future future = this.messageWriter.writeShutdownRequestedMessage(); + future.get(); + + Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), + Mockito.anyInt()); + Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); + } + @Test public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { Mockito.doThrow(IOException.class).when(stream).flush(); diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index f00bb48f..e1827799 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -114,6 +114,16 @@ public class MultiLangProtocolTest { assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true)); } + @Test + public void shutdownRequestedTest() { + when(messageWriter.writeShutdownRequestedMessage()).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdownRequested"), Message.class)); + Mockito.doReturn(buildFuture(true)).when(messageWriter) + .writeShutdownRequestedMessage(); + Mockito.doReturn(buildFuture(new StatusMessage("shutdownRequested"))).when(messageReader).getNextMessageFromSTDOUT(); + assertThat(protocol.shutdownRequested(null), equalTo(true)); + } + private Answer> buildMessageAnswers(List messages) { return new Answer>() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index 2c76aa30..e9a976ae 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -44,7 +44,7 @@ public class MessageTest { }); } })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), - new InitializeMessage(), new ProcessRecordsMessage() }; + new InitializeMessage(), new ProcessRecordsMessage(), new ShutdownRequestedMessage() }; for (int i = 0; i < messages.length; i++) { Assert.assertTrue("Each message should contain the action field", messages[i].toString().contains("action"));