Merge branch 'master' into java8

This commit is contained in:
Pfifer, Justin 2017-07-20 06:48:44 -07:00
commit dd4c451b3a
13 changed files with 118 additions and 10 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.7.4 Bundle-Version: 1.7.6
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -29,6 +29,15 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes ## Release Notes
### Release 1.7.6 (June 21, 2017)
* Added support for graceful shutdown in MultiLang Clients
* [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174)
* [PR #182](https://github.com/awslabs/amazon-kinesis-client/pull/182)
* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis`
* [PR #170](https://github.com/awslabs/amazon-kinesis-client/pull/170)
* Updated to version 1.11.151 of the AWS Java SDK
* [PR #183](https://github.com/awslabs/amazon-kinesis-client/pull/183)
### Release 1.7.5 (April 7, 2017) ### Release 1.7.5 (April 7, 2017)
* Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls. * Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls.
* [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152) * [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152)

View file

@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.115</aws-java-sdk.version> <aws-java-sdk.version>1.11.151</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>

View file

@ -121,7 +121,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.4"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.6";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -26,13 +26,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
@ -145,6 +148,13 @@ class MessageWriter {
return writeMessage(new ShutdownMessage(reason)); return writeMessage(new ShutdownMessage(reason));
} }
/**
* Writes a {@link ShutdownRequestedMessage} to the subprocess.
*/
Future<Boolean> writeShutdownRequestedMessage() {
return writeMessage(new ShutdownRequestedMessage());
}
/** /**
* Writes a {@link CheckpointMessage} to the subprocess. * Writes a {@link CheckpointMessage} to the subprocess.
* *

View file

@ -16,10 +16,13 @@ package com.amazonaws.services.kinesis.multilang;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -140,11 +143,25 @@ public class MultiLangDaemon implements Callable<Integer> {
ExecutorService executorService = config.getExecutorService(); ExecutorService executorService = config.getExecutorService();
// Daemon // Daemon
MultiLangDaemon daemon = new MultiLangDaemon( final MultiLangDaemon daemon = new MultiLangDaemon(
config.getKinesisClientLibConfiguration(), config.getKinesisClientLibConfiguration(),
config.getRecordProcessorFactory(), config.getRecordProcessorFactory(),
executorService); executorService);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOG.info("Process terminanted, will initiate shutdown.");
try {
Future<Void> fut = daemon.worker.requestShutdown();
fut.get(5000, TimeUnit.MILLISECONDS);
LOG.info("Process shutdown is complete.");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Encountered an error during shutdown.", e);
}
}
});
Future<Integer> future = executorService.submit(daemon); Future<Integer> future = executorService.submit(daemon);
try { try {
System.exit(future.get()); System.exit(future.get());

View file

@ -27,6 +27,7 @@ import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import lombok.extern.apachecommons.CommonsLog; import lombok.extern.apachecommons.CommonsLog;
@ -99,6 +100,18 @@ class MultiLangProtocol {
return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture); return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture);
} }
/**
* Writes a {@link ShutdownRequestedMessage} to the child process's STDIN and waits for the child process to respond with a
* {@link StatusMessage} on its STDOUT.
*
* @param checkpointer A checkpointer.
* @return Whether or not this operation succeeded.
*/
boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
Future<Boolean> writeFuture = messageWriter.writeShutdownRequestedMessage();
return waitForStatusMessage(ShutdownRequestedMessage.ACTION, checkpointer, writeFuture);
}
/** /**
* Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this * Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this
* method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns * method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns

View file

@ -20,6 +20,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,13 +33,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
* A record processor that manages creating a child process that implements the multi language protocol and connecting * A record processor that manages creating a child process that implements the multi language protocol and connecting
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
* that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are * that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are
* called. * called.
*/ */
public class MultiLangRecordProcessor implements IRecordProcessor { public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class); private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
private static final int EXIT_VALUE = 1; private static final int EXIT_VALUE = 1;
@ -136,6 +141,20 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
} }
} }
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Shutdown is requested.");
if (!initialized) {
LOG.info("Record processor was not initialized so no need to initiate a final checkpoint.");
return;
}
LOG.info("Requesting a checkpoint on shutdown notification.");
if (!protocol.shutdownRequested(checkpointer)) {
LOG.error("Child process failed to complete shutdown notification.");
}
}
/** /**
* Used to tell whether the processor has been shutdown already. * Used to tell whether the processor has been shutdown already.
*/ */

View file

@ -23,11 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* Abstract class for all messages that are sent to the client's process. * Abstract class for all messages that are sent to the client's process.
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "action") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "action")
@JsonSubTypes({ @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION), @JsonSubTypes({
@Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION),
@Type(value = InitializeMessage.class, name = InitializeMessage.ACTION), @Type(value = InitializeMessage.class, name = InitializeMessage.ACTION),
@Type(value = ProcessRecordsMessage.class, name = ProcessRecordsMessage.ACTION), @Type(value = ProcessRecordsMessage.class, name = ProcessRecordsMessage.ACTION),
@Type(value = ShutdownMessage.class, name = ShutdownMessage.ACTION), @Type(value = ShutdownMessage.class, name = ShutdownMessage.ACTION),
@Type(value = StatusMessage.class, name = StatusMessage.ACTION), }) @Type(value = StatusMessage.class, name = StatusMessage.ACTION),
@Type(value = ShutdownRequestedMessage.class, name = ShutdownRequestedMessage.ACTION),
})
public abstract class Message { public abstract class Message {
private ObjectMapper mapper = new ObjectMapper();; private ObjectMapper mapper = new ObjectMapper();;

View file

@ -0,0 +1,17 @@
package com.amazonaws.services.kinesis.multilang.messages;
/**
* A message to indicate to the client's process that shutdown is requested.
*/
public class ShutdownRequestedMessage extends Message {
/**
* The name used for the action field in {@link Message}.
*/
public static final String ACTION = "shutdownRequested";
/**
* Convenience constructor.
*/
public ShutdownRequestedMessage() {
}
}

View file

@ -113,6 +113,16 @@ public class MessageWriterTest {
Mockito.verify(this.stream, Mockito.atLeastOnce()).flush(); Mockito.verify(this.stream, Mockito.atLeastOnce()).flush();
} }
@Test
public void writeShutdownRequestedMessageTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeShutdownRequestedMessage();
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
Mockito.verify(this.stream, Mockito.atLeastOnce()).flush();
}
@Test @Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush(); Mockito.doThrow(IOException.class).when(stream).flush();

View file

@ -114,6 +114,16 @@ public class MultiLangProtocolTest {
assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true)); assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true));
} }
@Test
public void shutdownRequestedTest() {
when(messageWriter.writeShutdownRequestedMessage()).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdownRequested"), Message.class));
Mockito.doReturn(buildFuture(true)).when(messageWriter)
.writeShutdownRequestedMessage();
Mockito.doReturn(buildFuture(new StatusMessage("shutdownRequested"))).when(messageReader).getNextMessageFromSTDOUT();
assertThat(protocol.shutdownRequested(null), equalTo(true));
}
private Answer<Future<Message>> buildMessageAnswers(List<Message> messages) { private Answer<Future<Message>> buildMessageAnswers(List<Message> messages) {
return new Answer<Future<Message>>() { return new Answer<Future<Message>>() {

View file

@ -44,7 +44,7 @@ public class MessageTest {
}); });
} }
})), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
new InitializeMessage(), new ProcessRecordsMessage() }; new InitializeMessage(), new ProcessRecordsMessage(), new ShutdownRequestedMessage() };
for (int i = 0; i < messages.length; i++) { for (int i = 0; i < messages.length; i++) {
Assert.assertTrue("Each message should contain the action field", messages[i].toString().contains("action")); Assert.assertTrue("Each message should contain the action field", messages[i].toString().contains("action"));