From 198f8afce79edf146168bb93c772a3cda3040093 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 21 Jun 2017 07:36:39 -0700 Subject: [PATCH 1/5] Advance Version to 1.7.6 for Next Release --- META-INF/MANIFEST.MF | 2 +- pom.xml | 2 +- .../clientlibrary/lib/worker/KinesisClientLibConfiguration.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index dea3446e..986ff77d 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.7.4 +Bundle-Version: 1.7.6 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/pom.xml b/pom.xml index 26304866..cc4ad932 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.7.5 + 1.7.6-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 02450cb8..b708a30a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -121,7 +121,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.4"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.6"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls From 5a8bac23c613a307f22611c2f1a289347e893067 Mon Sep 17 00:00:00 2001 From: Muhammad Ikramul Haq Date: Wed, 21 Jun 2017 07:52:30 -0700 Subject: [PATCH 2/5] Trigger Graceful Shutdown for MultiLang Daemon Clients on SIGTERM (#174) When the parent Java process receives a SIGTERM it will now trigger a graceful shutdown of the worker, which dispatches a ShutdownRequestedMessage to all of the record processors. This will allow record processors a final chance to checkpoint before the lease is terminated. This changes is incompatible with current versions of the MultiLang Clients. A future change will allow older versions MultiLang clients to use newer versions of the Java KCL. --- .../kinesis/multilang/MessageWriter.java | 14 +++++++++++-- .../kinesis/multilang/MultiLangDaemon.java | 19 ++++++++++++++++- .../kinesis/multilang/MultiLangProtocol.java | 13 ++++++++++++ .../multilang/MultiLangRecordProcessor.java | 21 ++++++++++++++++++- .../messages/ShutdownRequestedMessage.java | 17 +++++++++++++++ .../kinesis/multilang/MessageWriterTest.java | 10 +++++++++ .../multilang/MultiLangProtocolTest.java | 10 +++++++++ .../multilang/messages/MessageTest.java | 2 +- 8 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index b2ddbfe3..3310d248 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,13 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; + +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -145,6 +148,13 @@ class MessageWriter { return writeMessage(new ShutdownMessage(reason)); } + /** + * Writes a {@link ShutdownRequestedMessage} to the subprocess. + */ + Future writeShutdownRequestedMessage() { + return writeMessage(new ShutdownRequestedMessage()); + } + /** * Writes a {@link CheckpointMessage} to the subprocess. * diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index fdff4dc7..885eeb4f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -16,10 +16,13 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.PrintStream; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -140,11 +143,25 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( + final MultiLangDaemon daemon = new MultiLangDaemon( config.getKinesisClientLibConfiguration(), config.getRecordProcessorFactory(), executorService); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOG.info("Process terminanted, will initiate shutdown."); + try { + Future fut = daemon.worker.requestShutdown(); + fut.get(5000, TimeUnit.MILLISECONDS); + LOG.info("Process shutdown is complete."); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Encountered an error during shutdown.", e); + } + } + }); + Future future = executorService.submit(daemon); try { System.exit(future.get()); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 64c7829f..67a97770 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import lombok.extern.apachecommons.CommonsLog; @@ -99,6 +100,18 @@ class MultiLangProtocol { return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); } + /** + * Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a + * {@link StatusMessage} on its STDOUT. + * + * @param checkpointer A checkpointer. + * @return Whether or not this operation succeeded. + */ + boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + Future writeFuture = messageWriter.writeShutdownRequestedMessage(); + return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture); + } + /** * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 9d76af54..2532771b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -20,6 +20,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,13 +33,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.fasterxml.jackson.databind.ObjectMapper; + /** * A record processor that manages creating a child process that implements the multi language protocol and connecting * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on * that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are * called. */ -public class MultiLangRecordProcessor implements IRecordProcessor { +public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class); private static final int EXIT_VALUE = 1; @@ -136,6 +141,20 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } } + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + LOG.info("Shutdown is requested."); + if (!initialized) { + LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); + return; + } + LOG.info("Requesting a checkpoint on shutdown notification."); + if (!protocol.shutdownRequested(checkpointer)) { + LOG.error("Child process failed to complete shutdown notification."); + } + } + + /** * Used to tell whether the processor has been shutdown already. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java new file mode 100644 index 00000000..6cf77964 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -0,0 +1,17 @@ +package com.amazonaws.services.kinesis.multilang.messages; + +/** + * A message to indicate to the client's process that shutdown is requested. + */ +public class ShutdownRequestedMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "shutdownRequested"; + + /** + * Convenience constructor. + */ + public ShutdownRequestedMessage() { + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 08f04c92..21771980 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -113,6 +113,16 @@ public class MessageWriterTest { Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); } + @Test + public void writeShutdownRequestedMessageTest() throws IOException, InterruptedException, ExecutionException { + Future future = this.messageWriter.writeShutdownRequestedMessage(); + future.get(); + + Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), + Mockito.anyInt()); + Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); + } + @Test public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { Mockito.doThrow(IOException.class).when(stream).flush(); diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index f00bb48f..e1827799 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -114,6 +114,16 @@ public class MultiLangProtocolTest { assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true)); } + @Test + public void shutdownRequestedTest() { + when(messageWriter.writeShutdownRequestedMessage()).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdownRequested"), Message.class)); + Mockito.doReturn(buildFuture(true)).when(messageWriter) + .writeShutdownRequestedMessage(); + Mockito.doReturn(buildFuture(new StatusMessage("shutdownRequested"))).when(messageReader).getNextMessageFromSTDOUT(); + assertThat(protocol.shutdownRequested(null), equalTo(true)); + } + private Answer> buildMessageAnswers(List messages) { return new Answer>() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index 2c76aa30..e9a976ae 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -44,7 +44,7 @@ public class MessageTest { }); } })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), - new InitializeMessage(), new ProcessRecordsMessage() }; + new InitializeMessage(), new ProcessRecordsMessage(), new ShutdownRequestedMessage() }; for (int i = 0; i < messages.length; i++) { Assert.assertTrue("Each message should contain the action field", messages[i].toString().contains("action")); From d7ed56d4d0844bdcb2da76c8ddb58577764f5b95 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Wed, 21 Jun 2017 08:36:41 -0700 Subject: [PATCH 3/5] Added ShutdownRequestedMessage to the subtypes for Message (#182) Added the ShutdownRequestedMessage to the subtypes for Message. This ensures that the action value of the message will be correctly set. --- .../services/kinesis/multilang/messages/Message.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java index 766cdac0..48ad09d0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java @@ -23,11 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; * Abstract class for all messages that are sent to the client's process. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "action") -@JsonSubTypes({ @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION), +@JsonSubTypes({ + @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION), @Type(value = InitializeMessage.class, name = InitializeMessage.ACTION), @Type(value = ProcessRecordsMessage.class, name = ProcessRecordsMessage.ACTION), @Type(value = ShutdownMessage.class, name = ShutdownMessage.ACTION), - @Type(value = StatusMessage.class, name = StatusMessage.ACTION), }) + @Type(value = StatusMessage.class, name = StatusMessage.ACTION), + @Type(value = ShutdownRequestedMessage.class, name = ShutdownRequestedMessage.ACTION), +}) public abstract class Message { private ObjectMapper mapper = new ObjectMapper();; From dabdc2982242a77fca13237aaccbcf83adf41256 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Wed, 21 Jun 2017 08:54:18 -0700 Subject: [PATCH 4/5] Update to the Newest Version of the AWS Java SDK (#183) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cc4ad932..3c2aacd3 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ - 1.11.115 + 1.11.151 1.0.392 libsqlite4java ${project.build.directory}/test-lib From 4c839a9d43cc54669dcaa952c675b2d08b7e62d8 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Thu, 22 Jun 2017 06:56:42 -0700 Subject: [PATCH 5/5] Release 1.7.6 of the Amazon Kinesis Client (#184) * Added support for graceful shutdown in MultiLang Clients * PR #174 * PR #182 * Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis` * PR #170 * Updated to version 1.11.151 of the AWS Java SDK * PR #183 --- README.md | 9 +++++++++ pom.xml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 14428a94..098adc8e 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,15 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.7.6 (June 21, 2017) +* Added support for graceful shutdown in MultiLang Clients + * [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174) + * [PR #182](https://github.com/awslabs/amazon-kinesis-client/pull/182) +* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis` + * [PR #170](https://github.com/awslabs/amazon-kinesis-client/pull/170) +* Updated to version 1.11.151 of the AWS Java SDK + * [PR #183](https://github.com/awslabs/amazon-kinesis-client/pull/183) + ### Release 1.7.5 (April 7, 2017) * Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls. * [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152) diff --git a/pom.xml b/pom.xml index 3c2aacd3..b4a1d10e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.7.6-SNAPSHOT + 1.7.6 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.