diff --git a/pom.xml b/pom.xml
index 880b7253..0ba040be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,13 @@
2.6
+
+ org.projectlombok
+ lombok
+ 1.16.10
+ provided
+
+
junit
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 174fb65e..53a213a7 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -120,7 +120,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.1-SNAPSHOT";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.2";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
index 2cda0632..b2ddbfe3 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
@@ -18,7 +18,6 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -27,7 +26,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
-import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
@@ -119,19 +119,21 @@ class MessageWriter {
/**
* Writes an {@link InitializeMessage} to the subprocess.
*
- * @param shardIdToWrite The shard id.
+ * @param initializationInput
+ * contains information about the shard being initialized
*/
- Future writeInitializeMessage(String shardIdToWrite) {
- return writeMessage(new InitializeMessage(shardIdToWrite));
+ Future writeInitializeMessage(InitializationInput initializationInput) {
+ return writeMessage(new InitializeMessage(initializationInput));
}
/**
* Writes a {@link ProcessRecordsMessage} message to the subprocess.
*
- * @param records The records to be processed.
+ * @param processRecordsInput
+ * the records, and associated metadata to be processed.
*/
- Future writeProcessRecordsMessage(List records) {
- return writeMessage(new ProcessRecordsMessage(records));
+ Future writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
+ return writeMessage(new ProcessRecordsMessage(processRecordsInput));
}
/**
@@ -146,11 +148,16 @@ class MessageWriter {
/**
* Writes a {@link CheckpointMessage} to the subprocess.
*
- * @param sequenceNumber The sequence number that was checkpointed.
- * @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
+ * @param sequenceNumber
+ * The sequence number that was checkpointed.
+ * @param subSequenceNumber
+ * the sub sequence number to checkpoint at.
+ * @param throwable
+ * The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
*/
- Future writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) {
- return writeMessage(new CheckpointMessage(sequenceNumber, throwable));
+ Future writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber,
+ Throwable throwable) {
+ return writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
}
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
index 8b74cabc..fdff4dc7 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
@@ -71,7 +72,13 @@ public class MultiLangDaemon implements Callable {
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory,
ExecutorService workerThreadPool) {
- this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
+ this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
+ }
+
+ private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
+ KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
+ return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
+ .execService(workerThreadPool).build();
}
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
index 7793f12b..f191eedc 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
@@ -14,17 +14,24 @@
*/
package com.amazonaws.services.kinesis.multilang;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class captures the configuration needed to run the MultiLangDaemon.
@@ -131,10 +138,29 @@ public class MultiLangDaemonConfig {
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
Properties properties = new Properties();
- try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
- properties.load(propertiesStream);
+ InputStream propertyStream = null;
+ try {
+ propertyStream = classLoader.getResourceAsStream(propertiesFileName);
+ if (propertyStream == null) {
+ File propertyFile = new File(propertiesFileName);
+ if (propertyFile.exists()) {
+ propertyStream = new FileInputStream(propertyFile);
+ }
+ }
+
+ if (propertyStream == null) {
+ throw new FileNotFoundException(
+ "Unable to find property file in classpath, or file system: '" + propertiesFileName + "'");
+ }
+
+ properties.load(propertyStream);
return properties;
+ } finally {
+ if (propertyStream != null) {
+ propertyStream.close();
+ }
}
+
}
private static boolean validateProperties(Properties properties) {
@@ -147,13 +173,16 @@ public class MultiLangDaemonConfig {
private static ExecutorService buildExecutorService(Properties properties) {
int maxActiveThreads = getMaxActiveThreads(properties);
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("multi-lang-daemon-%04d");
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
if (maxActiveThreads <= 0) {
LOG.info("Using a cached thread pool.");
- return Executors.newCachedThreadPool();
+ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(),
+ builder.build());
} else {
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
- return Executors.newFixedThreadPool(maxActiveThreads);
+ return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue(), builder.build());
}
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
index bce1793c..64c7829f 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
@@ -14,17 +14,14 @@
*/
package com.amazonaws.services.kinesis.multilang;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
-import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
@@ -32,28 +29,33 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
+import lombok.extern.apachecommons.CommonsLog;
+
/**
* An implementation of the multi language protocol.
*/
+@CommonsLog
class MultiLangProtocol {
- private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class);
-
private MessageReader messageReader;
private MessageWriter messageWriter;
- private String shardId;
+ private final InitializationInput initializationInput;
/**
* Constructor.
*
- * @param messageReader A message reader.
- * @param messageWriter A message writer.
- * @param shardId The shard id this processor is associated with.
+ * @param messageReader
+ * A message reader.
+ * @param messageWriter
+ * A message writer.
+ * @param initializationInput
+ * information about the shard this processor is starting to process
*/
- MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) {
+ MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
+ InitializationInput initializationInput) {
this.messageReader = messageReader;
this.messageWriter = messageWriter;
- this.shardId = shardId;
+ this.initializationInput = initializationInput;
}
/**
@@ -66,7 +68,7 @@ class MultiLangProtocol {
/*
* Call and response to child process.
*/
- Future writeFuture = messageWriter.writeInitializeMessage(shardId);
+ Future writeFuture = messageWriter.writeInitializeMessage(initializationInput);
return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture);
}
@@ -75,13 +77,13 @@ class MultiLangProtocol {
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
* with a {@link StatusMessage} on its STDOUT.
*
- * @param records The records to process.
- * @param checkpointer A checkpointer.
+ * @param processRecordsInput
+ * The records, and associated metadata, to process.
* @return Whether or not this operation succeeded.
*/
- boolean processRecords(List records, IRecordProcessorCheckpointer checkpointer) {
- Future writeFuture = messageWriter.writeProcessRecordsMessage(records);
- return waitForStatusMessage(ProcessRecordsMessage.ACTION, checkpointer, writeFuture);
+ boolean processRecords(ProcessRecordsInput processRecordsInput) {
+ Future writeFuture = messageWriter.writeProcessRecordsMessage(processRecordsInput);
+ return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.getCheckpointer(), writeFuture);
}
/**
@@ -105,32 +107,41 @@ class MultiLangProtocol {
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
* able to successfully communicate the results of its attempts to checkpoint.
*
- * @param action What action is being waited on.
- * @param checkpointer A checkpointer.
- * @param writeFuture The writing task.
+ * @param action
+ * What action is being waited on.
+ * @param checkpointer
+ * the checkpointer from the process records, or shutdown request
+ * @param writeFuture
+ * The writing task.
* @return Whether or not this operation succeeded.
*/
- private boolean waitForStatusMessage(String action,
- IRecordProcessorCheckpointer checkpointer,
- Future writeFuture) {
+ private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer,
+ Future writeFuture) {
boolean statusWasCorrect = waitForStatusMessage(action, checkpointer);
// Examine whether or not we failed somewhere along the line.
try {
- boolean writerIsStillOpen = Boolean.valueOf(writeFuture.get());
+ boolean writerIsStillOpen = writeFuture.get();
return statusWasCorrect && writerIsStillOpen;
} catch (InterruptedException e) {
- LOG.error(String.format("Interrupted while writing %s message for shard %s", action, shardId));
+ log.error(String.format("Interrupted while writing %s message for shard %s", action,
+ initializationInput.getShardId()));
return false;
} catch (ExecutionException e) {
- LOG.error(String.format("Failed to write %s message for shard %s", action, shardId), e);
+ log.error(
+ String.format("Failed to write %s message for shard %s", action, initializationInput.getShardId()),
+ e);
return false;
}
}
/**
- * @param action What action is being waited on.
- * @param checkpointer A checkpointer.
+ * Waits for status message and verifies it against the expectation
+ *
+ * @param action
+ * What action is being waited on.
+ * @param checkpointer
+ * the original process records request
* @return Whether or not this operation succeeded.
*/
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
@@ -141,8 +152,7 @@ class MultiLangProtocol {
Message message = future.get();
// Note that instanceof doubles as a check against a value being null
if (message instanceof CheckpointMessage) {
- boolean checkpointWriteSucceeded =
- Boolean.valueOf(checkpoint((CheckpointMessage) message, checkpointer).get());
+ boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
if (!checkpointWriteSucceeded) {
return false;
}
@@ -150,10 +160,12 @@ class MultiLangProtocol {
statusMessage = (StatusMessage) message;
}
} catch (InterruptedException e) {
- LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, shardId));
+ log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
+ initializationInput.getShardId()));
return false;
} catch (ExecutionException e) {
- LOG.error(String.format("Failed to get status message for %s action for shard %s", action, shardId), e);
+ log.error(String.format("Failed to get status message for %s action for shard %s", action,
+ initializationInput.getShardId()), e);
return false;
}
}
@@ -168,8 +180,8 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded.
*/
private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
- LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action
- + " while processing shard " + shardId);
+ log.info("Received response " + statusMessage + " from subprocess while waiting for " + action
+ + " while processing shard " + initializationInput.getShardId());
return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor()
.equals(action));
@@ -186,28 +198,38 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded.
*/
private Future checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
- String sequenceNumber = checkpointMessage.getCheckpoint();
+ String sequenceNumber = checkpointMessage.getSequenceNumber();
+ Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
try {
if (checkpointer != null) {
- if (sequenceNumber == null) {
- LOG.info(String.format("Attempting to checkpoint for shard %s", shardId));
- checkpointer.checkpoint();
+ log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber));
+ if (sequenceNumber != null) {
+ if (subSequenceNumber != null) {
+ checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
+ } else {
+ checkpointer.checkpoint(sequenceNumber);
+ }
} else {
- LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s",
- sequenceNumber, shardId));
- checkpointer.checkpoint(sequenceNumber);
+ checkpointer.checkpoint();
}
- return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null);
+ return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
} else {
String message =
String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s",
- sequenceNumber, shardId);
- LOG.error(message);
- return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException(
+ sequenceNumber, initializationInput.getShardId());
+ log.error(message);
+ return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber,
+ new InvalidStateException(
message));
}
} catch (Throwable t) {
- return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t);
+ return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t);
}
}
+
+ private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) {
+ return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s",
+ initializationInput.getShardId(), sequenceNumber, subSequenceNumber);
+ }
+
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
index babf6ac2..9d76af54 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
@@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.multilang;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -24,10 +23,10 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
-import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
@@ -61,56 +60,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
private MultiLangProtocol protocol;
- /**
- * Used to tell whether the processor has been shutdown already.
- */
- private enum ProcessState {
- ACTIVE, SHUTDOWN
- }
-
- /**
- * Constructor.
- *
- * @param processBuilder Provides process builder functionality.
- * @param executorService An executor
- * @param objectMapper An obejct mapper.
- */
- MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
- ObjectMapper objectMapper) {
- this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
- new DrainChildSTDERRTask());
- }
-
- /**
- * Note: This constructor has package level access solely for testing purposes.
- *
- * @param processBuilder Provides the child process for this record processor
- * @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
- * @param objectMapper Object mapper
- * @param messageWriter Message write to write to child process's stdin
- * @param messageReader Message reader to read from child process's stdout
- * @param readSTDERRTask Error reader to read from child process's stderr
- */
- MultiLangRecordProcessor(ProcessBuilder processBuilder,
- ExecutorService executorService,
- ObjectMapper objectMapper,
- MessageWriter messageWriter,
- MessageReader messageReader,
- DrainChildSTDERRTask readSTDERRTask) {
- this.executorService = executorService;
- this.processBuilder = processBuilder;
- this.objectMapper = objectMapper;
- this.messageWriter = messageWriter;
- this.messageReader = messageReader;
- this.readSTDERRTask = readSTDERRTask;
-
- this.state = ProcessState.ACTIVE;
- }
-
@Override
- public void initialize(String shardIdToProcess) {
+ public void initialize(InitializationInput initializationInput) {
try {
- this.shardId = shardIdToProcess;
+ this.shardId = initializationInput.getShardId();
try {
this.process = startProcess();
} catch (IOException e) {
@@ -129,7 +82,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
// Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask);
- protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
+ protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process");
}
@@ -142,9 +95,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
}
@Override
- public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {
+ public void processRecords(ProcessRecordsInput processRecordsInput) {
try {
- if (!protocol.processRecords(records, checkpointer)) {
+ if (!protocol.processRecords(processRecordsInput)) {
throw new RuntimeException("Child process failed to process records");
}
} catch (Throwable t) {
@@ -153,7 +106,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
}
@Override
- public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
+ public void shutdown(ShutdownInput shutdownInput) {
// In cases where KCL loses lease for the shard after creating record processor instance but before
// record processor initialize() is called, then shutdown() may be called directly before initialize().
if (!initialized) {
@@ -165,7 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
try {
if (ProcessState.ACTIVE.equals(this.state)) {
- if (!protocol.shutdown(checkpointer, reason)) {
+ if (!protocol.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason())) {
throw new RuntimeException("Child process failed to shutdown");
}
@@ -181,7 +134,57 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
+ " but it appears the processor has already been shutdown", t);
}
}
+ }
+ /**
+ * Used to tell whether the processor has been shutdown already.
+ */
+ private enum ProcessState {
+ ACTIVE, SHUTDOWN
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param processBuilder
+ * Provides process builder functionality.
+ * @param executorService
+ * An executor
+ * @param objectMapper
+ * An obejct mapper.
+ */
+ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
+ ObjectMapper objectMapper) {
+ this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
+ new DrainChildSTDERRTask());
+ }
+
+ /**
+ * Note: This constructor has package level access solely for testing purposes.
+ *
+ * @param processBuilder
+ * Provides the child process for this record processor
+ * @param executorService
+ * The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
+ * @param objectMapper
+ * Object mapper
+ * @param messageWriter
+ * Message write to write to child process's stdin
+ * @param messageReader
+ * Message reader to read from child process's stdout
+ * @param readSTDERRTask
+ * Error reader to read from child process's stderr
+ */
+ MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
+ MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
+ this.executorService = executorService;
+ this.processBuilder = processBuilder;
+ this.objectMapper = objectMapper;
+ this.messageWriter = messageWriter;
+ this.messageReader = messageReader;
+ this.readSTDERRTask = readSTDERRTask;
+
+ this.state = ProcessState.ACTIVE;
}
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java
index e8ed7eb7..e55217a6 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java
@@ -19,8 +19,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java
index 8d4b0ead..5cdc02bd 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java
@@ -14,11 +14,16 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to
* checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along
* with an error message which corresponds to the names of exceptions that a checkpointer can throw.
*/
+@Getter
+@Setter
public class CheckpointMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@@ -28,7 +33,8 @@ public class CheckpointMessage extends Message {
/**
* The checkpoint this message is about.
*/
- private String checkpoint;
+ private String sequenceNumber;
+ private Long subSequenceNumber;
/**
* The name of an exception that occurred while attempting to checkpoint.
@@ -44,43 +50,20 @@ public class CheckpointMessage extends Message {
/**
* Convenience constructor.
*
- * @param sequenceNumber The sequence number that this message is about.
- * @param throwable When responding to a client's process, the record processor will add the name of the exception
- * that occurred while attempting to checkpoint if one did occur.
+ * @param sequenceNumber
+ * The sequence number that this message is about.
+ * @param subSequenceNumber
+ * the sub sequence number for the checkpoint. This can be null.
+ * @param throwable
+ * When responding to a client's process, the record processor will add the name of the exception that
+ * occurred while attempting to checkpoint if one did occur.
*/
- public CheckpointMessage(String sequenceNumber, Throwable throwable) {
- this.setCheckpoint(sequenceNumber);
+ public CheckpointMessage(String sequenceNumber, Long subSequenceNumber, Throwable throwable) {
+ this.setSequenceNumber(sequenceNumber);
+ this.subSequenceNumber = subSequenceNumber;
if (throwable != null) {
this.setError(throwable.getClass().getSimpleName());
}
}
- /**
- * @return The checkpoint.
- */
- public String getCheckpoint() {
- return checkpoint;
- }
-
- /**
- * @return The error.
- */
- public String getError() {
- return error;
- }
-
- /**
- * @param checkpoint The checkpoint.
- */
- public void setCheckpoint(String checkpoint) {
- this.checkpoint = checkpoint;
- }
-
- /**
- * @param error The error.
- */
- public void setError(String error) {
- this.error = error;
- }
-
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java
index de30cde9..3795e57e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java
@@ -14,9 +14,15 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps.
*/
+@Getter
+@Setter
public class InitializeMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@@ -27,6 +33,8 @@ public class InitializeMessage extends Message {
* The shard id that this processor is getting initialized for.
*/
private String shardId;
+ private String sequenceNumber;
+ private Long subSequenceNumber;
/**
* Default constructor.
@@ -39,21 +47,16 @@ public class InitializeMessage extends Message {
*
* @param shardId The shard id.
*/
- public InitializeMessage(String shardId) {
- this.setShardId(shardId);
+ public InitializeMessage(InitializationInput initializationInput) {
+ this.shardId = initializationInput.getShardId();
+ if (initializationInput.getExtendedSequenceNumber() != null) {
+ this.sequenceNumber = initializationInput.getExtendedSequenceNumber().getSequenceNumber();
+ this.subSequenceNumber = initializationInput.getExtendedSequenceNumber().getSubSequenceNumber();
+ } else {
+ this.sequenceNumber = null;
+ this.subSequenceNumber = null;
+ }
+
}
- /**
- * @return The shard id.
- */
- public String getShardId() {
- return shardId;
- }
-
- /**
- * @param shardId The shard id.
- */
- public void setShardId(String shardId) {
- this.shardId = shardId;
- }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java
index b86df688..600489fe 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java
@@ -14,19 +14,29 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
-import com.amazonaws.services.kinesis.model.Record;
-
import java.util.Date;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.Record;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+import lombok.Setter;
+
/**
* Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes
* problems for the json library we're using.
*/
+@Getter
+@Setter
public class JsonFriendlyRecord {
private byte[] data;
private String partitionKey;
private String sequenceNumber;
private Date approximateArrivalTimestamp;
+ private Long subSequenceNumber;
+
+ public static String ACTION = "record";
/**
* Default Constructor.
@@ -40,109 +50,20 @@ public class JsonFriendlyRecord {
* @param record The record that this message will represent.
*/
public JsonFriendlyRecord(Record record) {
- this.withData(record.getData() == null ? null : record.getData().array())
- .withPartitionKey(record.getPartitionKey())
- .withSequenceNumber(record.getSequenceNumber())
- .withApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp());
+ this.data = record.getData() == null ? null : record.getData().array();
+ this.partitionKey = record.getPartitionKey();
+ this.sequenceNumber = record.getSequenceNumber();
+ this.approximateArrivalTimestamp = record.getApproximateArrivalTimestamp();
+ if (record instanceof UserRecord) {
+ this.subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();
+ } else {
+ this.subSequenceNumber = null;
+ }
}
- /**
- * @return The data.
- */
- public byte[] getData() {
- return data;
+ @JsonProperty
+ public String getAction() {
+ return ACTION;
}
- /**
- * @return The partition key.
- */
- public String getPartitionKey() {
- return partitionKey;
- }
-
- /**
- * @return The sequence number.
- */
- public String getSequenceNumber() {
- return sequenceNumber;
- }
-
- /**
- * @return The approximate arrival timestamp.
- */
- public Date getApproximateArrivalTimestamp() {
- return approximateArrivalTimestamp;
- }
-
- /**
- * @param data The data.
- */
- public void setData(byte[] data) {
- this.data = data;
- }
-
- /**
- * @param partitionKey The partition key.
- */
- public void setPartitionKey(String partitionKey) {
- this.partitionKey = partitionKey;
- }
-
- /**
- * @param sequenceNumber The sequence number.
- */
- public void setSequenceNumber(String sequenceNumber) {
- this.sequenceNumber = sequenceNumber;
- }
-
- /**
- * @param approximateArrivalTimestamp The approximate arrival timestamp.
- */
- public void setApproximateArrivalTimestamp(Date approximateArrivalTimestamp) {
- this.approximateArrivalTimestamp = approximateArrivalTimestamp;
- }
-
- /**
- * @param data The data.
- *
- * @return this
- */
- public JsonFriendlyRecord withData(byte[] data) {
- this.setData(data);
- return this;
- }
-
- /**
- * @param partitionKey The partition key.
- *
- * @return this
- */
- public JsonFriendlyRecord withPartitionKey(String partitionKey) {
- this.setPartitionKey(partitionKey);
- return this;
- }
-
- /**
- * @param sequenceNumber The sequence number.
- *
- * @return this
- */
- public JsonFriendlyRecord withSequenceNumber(String sequenceNumber) {
- this.setSequenceNumber(sequenceNumber);
- return this;
- }
-
- /**
- * @param approximateArrivalTimestamp The approximate arrival timestamp.
- *
- * @return this
- */
- public JsonFriendlyRecord withApproximateArrivalTimestamp(Date approximateArrivalTimestamp){
- this.setApproximateArrivalTimestamp(approximateArrivalTimestamp);
- return this;
- }
-
-
-
-
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java
index 1d0a6667..9e382b93 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java
@@ -17,11 +17,16 @@ package com.amazonaws.services.kinesis.multilang.messages;
import java.util.ArrayList;
import java.util.List;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
+import lombok.Getter;
+import lombok.Setter;
/**
* A message to indicate to the client's process that it should process a list of records.
*/
+@Getter
+@Setter
public class ProcessRecordsMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@@ -32,6 +37,7 @@ public class ProcessRecordsMessage extends Message {
* The records that the client's process needs to handle.
*/
private List records;
+ private Long millisBehindLatest;
/**
* Default constructor.
@@ -42,27 +48,15 @@ public class ProcessRecordsMessage extends Message {
/**
* Convenience constructor.
*
- * @param records The records.
+ * @param processRecordsInput
+ * the process records input to be sent to the child
*/
- public ProcessRecordsMessage(List records) {
+ public ProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
+ this.millisBehindLatest = processRecordsInput.getMillisBehindLatest();
List recordMessages = new ArrayList();
- for (Record record : records) {
+ for (Record record : processRecordsInput.getRecords()) {
recordMessages.add(new JsonFriendlyRecord(record));
}
this.setRecords(recordMessages);
}
-
- /**
- * @return The records.
- */
- public List getRecords() {
- return records;
- }
-
- /**
- * @param records The records.
- */
- public void setRecords(List records) {
- this.records = records;
- }
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java
new file mode 100644
index 00000000..b84d61a0
--- /dev/null
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java
@@ -0,0 +1,93 @@
+package com.amazonaws.services.kinesis.multilang;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+
+public class Matchers {
+
+ public static Matcher withInit(InitializationInput initializationInput) {
+ return new InitializationInputMatcher(initializationInput);
+ }
+
+ public static class InitializationInputMatcher extends TypeSafeDiagnosingMatcher {
+
+ private final Matcher shardIdMatcher;
+ private final Matcher sequenceNumberMatcher;
+
+ public InitializationInputMatcher(InitializationInput input) {
+ shardIdMatcher = equalTo(input.getShardId());
+ sequenceNumberMatcher = withSequence(input.getExtendedSequenceNumber());
+ }
+
+ @Override
+ protected boolean matchesSafely(final InitializationInput item, Description mismatchDescription) {
+
+ boolean matches = true;
+ if (!shardIdMatcher.matches(item.getShardId())) {
+ matches = false;
+ shardIdMatcher.describeMismatch(item.getShardId(), mismatchDescription);
+ }
+ if (!sequenceNumberMatcher.matches(item.getExtendedSequenceNumber())) {
+ matches = false;
+ sequenceNumberMatcher.describeMismatch(item, mismatchDescription);
+ }
+
+ return matches;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("An InitializationInput matching: { shardId: ").appendDescriptionOf(shardIdMatcher)
+ .appendText(", sequenceNumber: ").appendDescriptionOf(sequenceNumberMatcher).appendText(" }");
+ }
+ }
+
+ public static Matcher withSequence(ExtendedSequenceNumber extendedSequenceNumber) {
+ if (extendedSequenceNumber == null) {
+ return nullValue(ExtendedSequenceNumber.class);
+ }
+ return new ExtendedSequenceNumberMatcher(extendedSequenceNumber);
+ }
+
+ public static class ExtendedSequenceNumberMatcher extends TypeSafeDiagnosingMatcher {
+
+ private final Matcher sequenceNumberMatcher;
+ private final Matcher subSequenceNumberMatcher;
+
+ public ExtendedSequenceNumberMatcher(ExtendedSequenceNumber extendedSequenceNumber) {
+ sequenceNumberMatcher = equalTo(extendedSequenceNumber.getSequenceNumber());
+ subSequenceNumberMatcher = equalTo(extendedSequenceNumber.getSubSequenceNumber());
+ }
+
+ @Override
+ protected boolean matchesSafely(ExtendedSequenceNumber item, Description mismatchDescription) {
+
+ boolean matches = true;
+ if (!sequenceNumberMatcher.matches(item.getSequenceNumber())) {
+ matches = false;
+ mismatchDescription.appendDescriptionOf(sequenceNumberMatcher);
+ }
+ if (!subSequenceNumberMatcher.matches(item.getSubSequenceNumber())) {
+ matches = false;
+ mismatchDescription.appendDescriptionOf(subSequenceNumberMatcher);
+ }
+
+ return matches;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("An ExtendedSequenceNumber matching: { sequenceNumber: ")
+ .appendDescriptionOf(sequenceNumberMatcher).appendText(", subSequenceNumber: ")
+ .appendDescriptionOf(subSequenceNumberMatcher);
+ }
+ }
+
+}
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java
index 749a1dc8..08f04c92 100644
--- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -54,7 +56,7 @@ public class MessageWriterTest {
*/
@Test
public void writeCheckpointMessageNoErrorTest() throws IOException, InterruptedException, ExecutionException {
- Future future = this.messageWriter.writeCheckpointMessageWithError("1234", null);
+ Future future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, null);
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@@ -63,7 +65,7 @@ public class MessageWriterTest {
@Test
public void writeCheckpointMessageWithErrorTest() throws IOException, InterruptedException, ExecutionException {
- Future future = this.messageWriter.writeCheckpointMessageWithError("1234", new Throwable());
+ Future future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, new Throwable());
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@@ -72,7 +74,7 @@ public class MessageWriterTest {
@Test
public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException {
- Future future = this.messageWriter.writeInitializeMessage(shardId);
+ Future future = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@@ -93,7 +95,7 @@ public class MessageWriterTest {
this.add(new Record());
}
};
- Future future = this.messageWriter.writeProcessRecordsMessage(records);
+ Future future = this.messageWriter.writeProcessRecordsMessage(new ProcessRecordsInput().withRecords(records));
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
@@ -114,7 +116,7 @@ public class MessageWriterTest {
@Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush();
- Future initializeTask = this.messageWriter.writeInitializeMessage(shardId);
+ Future initializeTask = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Boolean result = initializeTask.get();
Assert.assertNotNull(result);
Assert.assertFalse(result);
@@ -144,7 +146,7 @@ public class MessageWriterTest {
Assert.assertFalse(this.messageWriter.isOpen());
try {
// Any message should fail
- this.messageWriter.writeInitializeMessage(shardId);
+ this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Assert.fail("MessageWriter should be closed and unable to write.");
} catch (IllegalStateException e) {
// This should happen.
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java
index 580b8ad4..f00bb48f 100644
--- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java
@@ -14,14 +14,23 @@
*/
package com.amazonaws.services.kinesis.multilang;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.junit.Assert;
-
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -33,62 +42,76 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
+import com.google.common.util.concurrent.SettableFuture;
public class MultiLangProtocolTest {
+ private static final List EMPTY_RECORD_LIST = Collections.emptyList();
private MultiLangProtocol protocol;
private MessageWriter messageWriter;
private MessageReader messageReader;
private String shardId;
private IRecordProcessorCheckpointer checkpointer;
+
+
@Before
public void setup() {
this.shardId = "shard-id-123";
messageWriter = Mockito.mock(MessageWriter.class);
messageReader = Mockito.mock(MessageReader.class);
- protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
+ protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
}
- private Future buildBooleanFuture(boolean val) throws InterruptedException, ExecutionException {
- Future successFuture = Mockito.mock(Future.class);
- Mockito.doReturn(val).when(successFuture).get();
- return successFuture;
+ private Future buildFuture(T value) {
+ SettableFuture future = SettableFuture.create();
+ future.set(value);
+ return future;
}
- private Future buildMessageFuture(Message message) throws InterruptedException, ExecutionException {
- Future messageFuture = Mockito.mock(Future.class);
- Mockito.doReturn(message).when(messageFuture).get();
- return messageFuture;
+ private Future buildFuture(T value, Class clazz) {
+ SettableFuture future = SettableFuture.create();
+ future.set(value);
+ return future;
}
@Test
public void initializeTest() throws InterruptedException, ExecutionException {
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeInitializeMessage(shardId);
- Mockito.doReturn(buildMessageFuture(new StatusMessage("initialize"))).when(messageReader).getNextMessageFromSTDOUT();
- Assert.assertTrue(protocol.initialize());
+ when(messageWriter
+ .writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId)))))
+ .thenReturn(buildFuture(true));
+ when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class));
+ assertThat(protocol.initialize(), equalTo(true));
}
@Test
public void processRecordsTest() throws InterruptedException, ExecutionException {
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.doReturn(buildMessageFuture(new StatusMessage("processRecords"))).when(messageReader).getNextMessageFromSTDOUT();
- Assert.assertTrue(protocol.processRecords(new ArrayList(), null));
+ when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
+ when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
+
+ assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)), equalTo(true));
}
@Test
public void shutdownTest() throws InterruptedException, ExecutionException {
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter)
- .writeShutdownMessage(Mockito.any(ShutdownReason.class));
- Mockito.doReturn(buildMessageFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
- Assert.assertTrue(protocol.shutdown(null, ShutdownReason.ZOMBIE));
+ when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(buildFuture(true));
+ when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdown"), Message.class));
+
+ Mockito.doReturn(buildFuture(true)).when(messageWriter)
+ .writeShutdownMessage(any(ShutdownReason.class));
+ Mockito.doReturn(buildFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
+ assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true));
}
private Answer> buildMessageAnswers(List messages) {
@@ -107,7 +130,7 @@ public class MultiLangProtocolTest {
if (this.messageIterator.hasNext()) {
message = this.messageIterator.next();
}
- return buildMessageFuture(message);
+ return buildFuture(message);
}
}.init(messages);
@@ -117,13 +140,12 @@ public class MultiLangProtocolTest {
public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException,
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter)
- .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
- Mockito.doAnswer(buildMessageAnswers(new ArrayList() {
+ when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
+ when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(true));
+ when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList() {
{
- this.add(new CheckpointMessage("123", null));
- this.add(new CheckpointMessage(null, null));
+ this.add(new CheckpointMessage("123", 0L, null));
+ this.add(new CheckpointMessage(null, 0L, null));
/*
* This procesRecords message will be ignored by the read loop which only cares about status and
* checkpoint messages. All other lines and message types are ignored. By inserting it here, we check
@@ -132,24 +154,23 @@ public class MultiLangProtocolTest {
this.add(new ProcessRecordsMessage());
this.add(new StatusMessage("processRecords"));
}
- })).when(messageReader).getNextMessageFromSTDOUT();
- Assert.assertTrue(protocol.processRecords(new ArrayList(), checkpointer));
+ }));
+ assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true));
- Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint();
- Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint("123");
+ verify(checkpointer, timeout(1)).checkpoint();
+ verify(checkpointer, timeout(1)).checkpoint("123", 0L);
}
@Test
public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException {
- Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.doReturn(buildBooleanFuture(false)).when(messageWriter)
- .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
- Mockito.doAnswer(buildMessageAnswers(new ArrayList() {
+ when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
+ when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
+ when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList() {
{
- this.add(new CheckpointMessage("456", null));
+ this.add(new CheckpointMessage("456", 0L, null));
this.add(new StatusMessage("processRecords"));
}
- })).when(messageReader).getNextMessageFromSTDOUT();
- Assert.assertFalse(protocol.processRecords(new ArrayList(), checkpointer));
+ }));
+ assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
}
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java
index 3cdc488c..a8f5885b 100644
--- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java
@@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.multilang;
import org.junit.Assert;
import org.junit.Test;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
public class StreamingRecordProcessorFactoryTest {
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java
index a03b164d..2c02b5e9 100644
--- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java
@@ -14,6 +14,15 @@
*/
package com.amazonaws.services.kinesis.multilang;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -24,11 +33,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
@@ -36,20 +49,27 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
+import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
+@RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorTest {
private static final String shardId = "shard-123";
private int systemExitCount = 0;
+ @Mock
+ private Future messageFuture;
+
private IRecordProcessorCheckpointer unimplementedCheckpointer = new IRecordProcessorCheckpointer() {
@Override
@@ -129,11 +149,10 @@ public class StreamingRecordProcessorTest {
Future trueFuture = Mockito.mock(Future.class);
Mockito.doReturn(true).when(trueFuture).get();
- Mockito.doReturn(trueFuture).when(messageWriter).writeInitializeMessage(Mockito.anyString());
- Mockito.doReturn(trueFuture).when(messageWriter)
- .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
- Mockito.doReturn(trueFuture).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.doReturn(trueFuture).when(messageWriter).writeShutdownMessage(Mockito.any(ShutdownReason.class));
+ when(messageWriter.writeInitializeMessage(any(InitializationInput.class))).thenReturn(trueFuture);
+ when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(trueFuture);
+ when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(trueFuture);
+ when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(trueFuture);
}
private void phases(Answer answer) throws InterruptedException, ExecutionException {
@@ -145,16 +164,15 @@ public class StreamingRecordProcessorTest {
* processRecords
* shutdown
*/
- Future future = Mockito.mock(Future.class);
- Mockito.doAnswer(answer).when(future).get();
- Mockito.doReturn(future).when(messageReader).getNextMessageFromSTDOUT();
+ when(messageFuture.get()).thenAnswer(answer);
+ when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture);
List testRecords = new ArrayList();
- recordProcessor.initialize(shardId);
- recordProcessor.processRecords(testRecords, unimplementedCheckpointer);
- recordProcessor.processRecords(testRecords, unimplementedCheckpointer);
- recordProcessor.shutdown(unimplementedCheckpointer, ShutdownReason.ZOMBIE);
+ recordProcessor.initialize(new InitializationInput().withShardId(shardId));
+ recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
+ recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
+ recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE));
}
@Test
@@ -180,9 +198,10 @@ public class StreamingRecordProcessorTest {
phases(answer);
- Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId);
- Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.verify(messageWriter, Mockito.times(1)).writeShutdownMessage(ShutdownReason.ZOMBIE);
+ verify(messageWriter)
+ .writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId))));
+ verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
+ verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE);
}
@Test
@@ -211,9 +230,10 @@ public class StreamingRecordProcessorTest {
phases(answer);
- Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId);
- Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList());
- Mockito.verify(messageWriter, Mockito.times(0)).writeShutdownMessage(ShutdownReason.ZOMBIE);
+ verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput()
+ .withShardId(shardId))));
+ verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
+ verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE);
Assert.assertEquals(1, systemExitCount);
}
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java
index 9b90fe60..2c76aa30 100644
--- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java
@@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.multilang.messages;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert;
import org.junit.Test;
@@ -30,8 +32,8 @@ public class MessageTest {
@Test
public void toStringTest() {
Message[] messages =
- new Message[] { new CheckpointMessage("1234567890", null), new InitializeMessage("shard-123"),
- new ProcessRecordsMessage(new ArrayList() {
+ new Message[] { new CheckpointMessage("1234567890", 0L, null), new InitializeMessage(new InitializationInput().withShardId("shard-123")),
+ new ProcessRecordsMessage(new ProcessRecordsInput().withRecords(new ArrayList() {
{
this.add(new Record() {
{
@@ -41,7 +43,7 @@ public class MessageTest {
}
});
}
- }), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
+ })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
new InitializeMessage(), new ProcessRecordsMessage() };
for (int i = 0; i < messages.length; i++) {