diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index dea3446e..986ff77d 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.7.4 +Bundle-Version: 1.7.6 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/README.md b/README.md index 14428a94..098adc8e 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,15 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.7.6 (June 21, 2017) +* Added support for graceful shutdown in MultiLang Clients + * [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174) + * [PR #182](https://github.com/awslabs/amazon-kinesis-client/pull/182) +* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis` + * [PR #170](https://github.com/awslabs/amazon-kinesis-client/pull/170) +* Updated to version 1.11.151 of the AWS Java SDK + * [PR #183](https://github.com/awslabs/amazon-kinesis-client/pull/183) + ### Release 1.7.5 (April 7, 2017) * Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls. * [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152) diff --git a/pom.xml b/pom.xml index 0772a842..47069ad7 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ - 1.11.115 + 1.11.151 1.0.392 libsqlite4java ${project.build.directory}/test-lib diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 02450cb8..b708a30a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -121,7 +121,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.4"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.6"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index b2ddbfe3..3310d248 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,13 +26,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; + +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -145,6 +148,13 @@ class MessageWriter { return writeMessage(new ShutdownMessage(reason)); } + /** + * Writes a {@link ShutdownRequestedMessage} to the subprocess. + */ + Future writeShutdownRequestedMessage() { + return writeMessage(new ShutdownRequestedMessage()); + } + /** * Writes a {@link CheckpointMessage} to the subprocess. * diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index fdff4dc7..885eeb4f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -16,10 +16,13 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.PrintStream; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -140,11 +143,25 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( + final MultiLangDaemon daemon = new MultiLangDaemon( config.getKinesisClientLibConfiguration(), config.getRecordProcessorFactory(), executorService); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOG.info("Process terminanted, will initiate shutdown."); + try { + Future fut = daemon.worker.requestShutdown(); + fut.get(5000, TimeUnit.MILLISECONDS); + LOG.info("Process shutdown is complete."); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Encountered an error during shutdown.", e); + } + } + }); + Future future = executorService.submit(daemon); try { System.exit(future.get()); diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 64c7829f..67a97770 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; +import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import lombok.extern.apachecommons.CommonsLog; @@ -99,6 +100,18 @@ class MultiLangProtocol { return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); } + /** + * Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a + * {@link StatusMessage} on its STDOUT. + * + * @param checkpointer A checkpointer. + * @return Whether or not this operation succeeded. + */ + boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + Future writeFuture = messageWriter.writeShutdownRequestedMessage(); + return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture); + } + /** * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index 9d76af54..2532771b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -20,6 +20,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,13 +33,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.fasterxml.jackson.databind.ObjectMapper; + /** * A record processor that manages creating a child process that implements the multi language protocol and connecting * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on * that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are * called. */ -public class MultiLangRecordProcessor implements IRecordProcessor { +public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class); private static final int EXIT_VALUE = 1; @@ -136,6 +141,20 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } } + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + LOG.info("Shutdown is requested."); + if (!initialized) { + LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); + return; + } + LOG.info("Requesting a checkpoint on shutdown notification."); + if (!protocol.shutdownRequested(checkpointer)) { + LOG.error("Child process failed to complete shutdown notification."); + } + } + + /** * Used to tell whether the processor has been shutdown already. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java index 766cdac0..48ad09d0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/Message.java @@ -23,11 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; * Abstract class for all messages that are sent to the client's process. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "action") -@JsonSubTypes({ @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION), +@JsonSubTypes({ + @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION), @Type(value = InitializeMessage.class, name = InitializeMessage.ACTION), @Type(value = ProcessRecordsMessage.class, name = ProcessRecordsMessage.ACTION), @Type(value = ShutdownMessage.class, name = ShutdownMessage.ACTION), - @Type(value = StatusMessage.class, name = StatusMessage.ACTION), }) + @Type(value = StatusMessage.class, name = StatusMessage.ACTION), + @Type(value = ShutdownRequestedMessage.class, name = ShutdownRequestedMessage.ACTION), +}) public abstract class Message { private ObjectMapper mapper = new ObjectMapper();; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java new file mode 100644 index 00000000..6cf77964 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownRequestedMessage.java @@ -0,0 +1,17 @@ +package com.amazonaws.services.kinesis.multilang.messages; + +/** + * A message to indicate to the client's process that shutdown is requested. + */ +public class ShutdownRequestedMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "shutdownRequested"; + + /** + * Convenience constructor. + */ + public ShutdownRequestedMessage() { + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 08f04c92..21771980 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -113,6 +113,16 @@ public class MessageWriterTest { Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); } + @Test + public void writeShutdownRequestedMessageTest() throws IOException, InterruptedException, ExecutionException { + Future future = this.messageWriter.writeShutdownRequestedMessage(); + future.get(); + + Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), + Mockito.anyInt()); + Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); + } + @Test public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { Mockito.doThrow(IOException.class).when(stream).flush(); diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index f00bb48f..e1827799 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -114,6 +114,16 @@ public class MultiLangProtocolTest { assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true)); } + @Test + public void shutdownRequestedTest() { + when(messageWriter.writeShutdownRequestedMessage()).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdownRequested"), Message.class)); + Mockito.doReturn(buildFuture(true)).when(messageWriter) + .writeShutdownRequestedMessage(); + Mockito.doReturn(buildFuture(new StatusMessage("shutdownRequested"))).when(messageReader).getNextMessageFromSTDOUT(); + assertThat(protocol.shutdownRequested(null), equalTo(true)); + } + private Answer> buildMessageAnswers(List messages) { return new Answer>() { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index 2c76aa30..e9a976ae 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -44,7 +44,7 @@ public class MessageTest { }); } })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), - new InitializeMessage(), new ProcessRecordsMessage() }; + new InitializeMessage(), new ProcessRecordsMessage(), new ShutdownRequestedMessage() }; for (int i = 0; i < messages.length; i++) { Assert.assertTrue("Each message should contain the action field", messages[i].toString().contains("action"));