Update MultiLangDaemon to Support New Features (#118)

Updated the MultiLangDaemon to use the v2 record processor interfaces, and added features to messages passed to MultiLangDaemon clients.

These changes will require updates to the various MultiLangDaemon clients. The changes for the Python version are complete, and other versions will be updated later.
This commit is contained in:
Justin Pfifer 2016-11-07 11:38:04 -08:00 committed by GitHub
parent ed7d069e50
commit 5d045521ce
18 changed files with 478 additions and 364 deletions

View file

@ -63,6 +63,13 @@
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>

View file

@ -120,7 +120,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.1-SNAPSHOT";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.2";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -18,7 +18,6 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -27,7 +26,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
@ -119,19 +119,21 @@ class MessageWriter {
/**
* Writes an {@link InitializeMessage} to the subprocess.
*
* @param shardIdToWrite The shard id.
* @param initializationInput
* contains information about the shard being initialized
*/
Future<Boolean> writeInitializeMessage(String shardIdToWrite) {
return writeMessage(new InitializeMessage(shardIdToWrite));
Future<Boolean> writeInitializeMessage(InitializationInput initializationInput) {
return writeMessage(new InitializeMessage(initializationInput));
}
/**
* Writes a {@link ProcessRecordsMessage} message to the subprocess.
*
* @param records The records to be processed.
* @param processRecordsInput
* the records, and associated metadata to be processed.
*/
Future<Boolean> writeProcessRecordsMessage(List<Record> records) {
return writeMessage(new ProcessRecordsMessage(records));
Future<Boolean> writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
return writeMessage(new ProcessRecordsMessage(processRecordsInput));
}
/**
@ -146,11 +148,16 @@ class MessageWriter {
/**
* Writes a {@link CheckpointMessage} to the subprocess.
*
* @param sequenceNumber The sequence number that was checkpointed.
* @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
* @param sequenceNumber
* The sequence number that was checkpointed.
* @param subSequenceNumber
* the sub sequence number to checkpoint at.
* @param throwable
* The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
*/
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) {
return writeMessage(new CheckpointMessage(sequenceNumber, throwable));
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber,
Throwable throwable) {
return writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
}
/**

View file

@ -24,6 +24,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
@ -71,7 +72,13 @@ public class MultiLangDaemon implements Callable<Integer> {
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory,
ExecutorService workerThreadPool) {
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
}
private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
.execService(workerThreadPool).build();
}
/**

View file

@ -14,17 +14,24 @@
*/
package com.amazonaws.services.kinesis.multilang;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class captures the configuration needed to run the MultiLangDaemon.
@ -131,10 +138,29 @@ public class MultiLangDaemonConfig {
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
Properties properties = new Properties();
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
properties.load(propertiesStream);
InputStream propertyStream = null;
try {
propertyStream = classLoader.getResourceAsStream(propertiesFileName);
if (propertyStream == null) {
File propertyFile = new File(propertiesFileName);
if (propertyFile.exists()) {
propertyStream = new FileInputStream(propertyFile);
}
}
if (propertyStream == null) {
throw new FileNotFoundException(
"Unable to find property file in classpath, or file system: '" + propertiesFileName + "'");
}
properties.load(propertyStream);
return properties;
} finally {
if (propertyStream != null) {
propertyStream.close();
}
}
}
private static boolean validateProperties(Properties properties) {
@ -147,13 +173,16 @@ public class MultiLangDaemonConfig {
private static ExecutorService buildExecutorService(Properties properties) {
int maxActiveThreads = getMaxActiveThreads(properties);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("multi-lang-daemon-%04d");
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
if (maxActiveThreads <= 0) {
LOG.info("Using a cached thread pool.");
return Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
builder.build());
} else {
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
return Executors.newFixedThreadPool(maxActiveThreads);
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), builder.build());
}
}

View file

@ -14,17 +14,14 @@
*/
package com.amazonaws.services.kinesis.multilang;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
@ -32,28 +29,33 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import lombok.extern.apachecommons.CommonsLog;
/**
* An implementation of the multi language protocol.
*/
@CommonsLog
class MultiLangProtocol {
private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class);
private MessageReader messageReader;
private MessageWriter messageWriter;
private String shardId;
private final InitializationInput initializationInput;
/**
* Constructor.
*
* @param messageReader A message reader.
* @param messageWriter A message writer.
* @param shardId The shard id this processor is associated with.
* @param messageReader
* A message reader.
* @param messageWriter
* A message writer.
* @param initializationInput
* information about the shard this processor is starting to process
*/
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) {
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
InitializationInput initializationInput) {
this.messageReader = messageReader;
this.messageWriter = messageWriter;
this.shardId = shardId;
this.initializationInput = initializationInput;
}
/**
@ -66,7 +68,7 @@ class MultiLangProtocol {
/*
* Call and response to child process.
*/
Future<Boolean> writeFuture = messageWriter.writeInitializeMessage(shardId);
Future<Boolean> writeFuture = messageWriter.writeInitializeMessage(initializationInput);
return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture);
}
@ -75,13 +77,13 @@ class MultiLangProtocol {
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
* with a {@link StatusMessage} on its STDOUT.
*
* @param records The records to process.
* @param checkpointer A checkpointer.
* @param processRecordsInput
* The records, and associated metadata, to process.
* @return Whether or not this operation succeeded.
*/
boolean processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
Future<Boolean> writeFuture = messageWriter.writeProcessRecordsMessage(records);
return waitForStatusMessage(ProcessRecordsMessage.ACTION, checkpointer, writeFuture);
boolean processRecords(ProcessRecordsInput processRecordsInput) {
Future<Boolean> writeFuture = messageWriter.writeProcessRecordsMessage(processRecordsInput);
return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.getCheckpointer(), writeFuture);
}
/**
@ -105,32 +107,41 @@ class MultiLangProtocol {
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
* able to successfully communicate the results of its attempts to checkpoint.
*
* @param action What action is being waited on.
* @param checkpointer A checkpointer.
* @param writeFuture The writing task.
* @param action
* What action is being waited on.
* @param checkpointer
* the checkpointer from the process records, or shutdown request
* @param writeFuture
* The writing task.
* @return Whether or not this operation succeeded.
*/
private boolean waitForStatusMessage(String action,
IRecordProcessorCheckpointer checkpointer,
Future<Boolean> writeFuture) {
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer,
Future<Boolean> writeFuture) {
boolean statusWasCorrect = waitForStatusMessage(action, checkpointer);
// Examine whether or not we failed somewhere along the line.
try {
boolean writerIsStillOpen = Boolean.valueOf(writeFuture.get());
boolean writerIsStillOpen = writeFuture.get();
return statusWasCorrect && writerIsStillOpen;
} catch (InterruptedException e) {
LOG.error(String.format("Interrupted while writing %s message for shard %s", action, shardId));
log.error(String.format("Interrupted while writing %s message for shard %s", action,
initializationInput.getShardId()));
return false;
} catch (ExecutionException e) {
LOG.error(String.format("Failed to write %s message for shard %s", action, shardId), e);
log.error(
String.format("Failed to write %s message for shard %s", action, initializationInput.getShardId()),
e);
return false;
}
}
/**
* @param action What action is being waited on.
* @param checkpointer A checkpointer.
* Waits for status message and verifies it against the expectation
*
* @param action
* What action is being waited on.
* @param checkpointer
* the original process records request
* @return Whether or not this operation succeeded.
*/
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
@ -141,8 +152,7 @@ class MultiLangProtocol {
Message message = future.get();
// Note that instanceof doubles as a check against a value being null
if (message instanceof CheckpointMessage) {
boolean checkpointWriteSucceeded =
Boolean.valueOf(checkpoint((CheckpointMessage) message, checkpointer).get());
boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
if (!checkpointWriteSucceeded) {
return false;
}
@ -150,10 +160,12 @@ class MultiLangProtocol {
statusMessage = (StatusMessage) message;
}
} catch (InterruptedException e) {
LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, shardId));
log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()));
return false;
} catch (ExecutionException e) {
LOG.error(String.format("Failed to get status message for %s action for shard %s", action, shardId), e);
log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
return false;
}
}
@ -168,8 +180,8 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded.
*/
private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action
+ " while processing shard " + shardId);
log.info("Received response " + statusMessage + " from subprocess while waiting for " + action
+ " while processing shard " + initializationInput.getShardId());
return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor()
.equals(action));
@ -186,28 +198,38 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded.
*/
private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
String sequenceNumber = checkpointMessage.getCheckpoint();
String sequenceNumber = checkpointMessage.getSequenceNumber();
Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
try {
if (checkpointer != null) {
if (sequenceNumber == null) {
LOG.info(String.format("Attempting to checkpoint for shard %s", shardId));
checkpointer.checkpoint();
log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber));
if (sequenceNumber != null) {
if (subSequenceNumber != null) {
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
} else {
checkpointer.checkpoint(sequenceNumber);
}
} else {
LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s",
sequenceNumber, shardId));
checkpointer.checkpoint(sequenceNumber);
checkpointer.checkpoint();
}
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null);
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
} else {
String message =
String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s",
sequenceNumber, shardId);
LOG.error(message);
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException(
sequenceNumber, initializationInput.getShardId());
log.error(message);
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber,
new InvalidStateException(
message));
}
} catch (Throwable t) {
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t);
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t);
}
}
private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) {
return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s",
initializationInput.getShardId(), sequenceNumber, subSequenceNumber);
}
}

View file

@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.multilang;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -24,10 +23,10 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
@ -61,56 +60,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
private MultiLangProtocol protocol;
/**
* Used to tell whether the processor has been shutdown already.
*/
private enum ProcessState {
ACTIVE, SHUTDOWN
}
/**
* Constructor.
*
* @param processBuilder Provides process builder functionality.
* @param executorService An executor
* @param objectMapper An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
}
/**
* Note: This constructor has package level access solely for testing purposes.
*
* @param processBuilder Provides the child process for this record processor
* @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
* @param objectMapper Object mapper
* @param messageWriter Message write to write to child process's stdin
* @param messageReader Message reader to read from child process's stdout
* @param readSTDERRTask Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder,
ExecutorService executorService,
ObjectMapper objectMapper,
MessageWriter messageWriter,
MessageReader messageReader,
DrainChildSTDERRTask readSTDERRTask) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.state = ProcessState.ACTIVE;
}
@Override
public void initialize(String shardIdToProcess) {
public void initialize(InitializationInput initializationInput) {
try {
this.shardId = shardIdToProcess;
this.shardId = initializationInput.getShardId();
try {
this.process = startProcess();
} catch (IOException e) {
@ -129,7 +82,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
// Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask);
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process");
}
@ -142,9 +95,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
public void processRecords(ProcessRecordsInput processRecordsInput) {
try {
if (!protocol.processRecords(records, checkpointer)) {
if (!protocol.processRecords(processRecordsInput)) {
throw new RuntimeException("Child process failed to process records");
}
} catch (Throwable t) {
@ -153,7 +106,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
public void shutdown(ShutdownInput shutdownInput) {
// In cases where KCL loses lease for the shard after creating record processor instance but before
// record processor initialize() is called, then shutdown() may be called directly before initialize().
if (!initialized) {
@ -165,7 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
try {
if (ProcessState.ACTIVE.equals(this.state)) {
if (!protocol.shutdown(checkpointer, reason)) {
if (!protocol.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason())) {
throw new RuntimeException("Child process failed to shutdown");
}
@ -181,7 +134,57 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
+ " but it appears the processor has already been shutdown", t);
}
}
}
/**
* Used to tell whether the processor has been shutdown already.
*/
private enum ProcessState {
ACTIVE, SHUTDOWN
}
/**
* Constructor.
*
* @param processBuilder
* Provides process builder functionality.
* @param executorService
* An executor
* @param objectMapper
* An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
}
/**
* Note: This constructor has package level access solely for testing purposes.
*
* @param processBuilder
* Provides the child process for this record processor
* @param executorService
* The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
* @param objectMapper
* Object mapper
* @param messageWriter
* Message write to write to child process's stdin
* @param messageReader
* Message reader to read from child process's stdout
* @param readSTDERRTask
* Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.state = ProcessState.ACTIVE;
}
/**

View file

@ -19,8 +19,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
/**

View file

@ -14,11 +14,16 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
import lombok.Getter;
import lombok.Setter;
/**
* A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to
* checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along
* with an error message which corresponds to the names of exceptions that a checkpointer can throw.
*/
@Getter
@Setter
public class CheckpointMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@ -28,7 +33,8 @@ public class CheckpointMessage extends Message {
/**
* The checkpoint this message is about.
*/
private String checkpoint;
private String sequenceNumber;
private Long subSequenceNumber;
/**
* The name of an exception that occurred while attempting to checkpoint.
@ -44,43 +50,20 @@ public class CheckpointMessage extends Message {
/**
* Convenience constructor.
*
* @param sequenceNumber The sequence number that this message is about.
* @param throwable When responding to a client's process, the record processor will add the name of the exception
* that occurred while attempting to checkpoint if one did occur.
* @param sequenceNumber
* The sequence number that this message is about.
* @param subSequenceNumber
* the sub sequence number for the checkpoint. This can be null.
* @param throwable
* When responding to a client's process, the record processor will add the name of the exception that
* occurred while attempting to checkpoint if one did occur.
*/
public CheckpointMessage(String sequenceNumber, Throwable throwable) {
this.setCheckpoint(sequenceNumber);
public CheckpointMessage(String sequenceNumber, Long subSequenceNumber, Throwable throwable) {
this.setSequenceNumber(sequenceNumber);
this.subSequenceNumber = subSequenceNumber;
if (throwable != null) {
this.setError(throwable.getClass().getSimpleName());
}
}
/**
* @return The checkpoint.
*/
public String getCheckpoint() {
return checkpoint;
}
/**
* @return The error.
*/
public String getError() {
return error;
}
/**
* @param checkpoint The checkpoint.
*/
public void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
}
/**
* @param error The error.
*/
public void setError(String error) {
this.error = error;
}
}

View file

@ -14,9 +14,15 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import lombok.Getter;
import lombok.Setter;
/**
* An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps.
*/
@Getter
@Setter
public class InitializeMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@ -27,6 +33,8 @@ public class InitializeMessage extends Message {
* The shard id that this processor is getting initialized for.
*/
private String shardId;
private String sequenceNumber;
private Long subSequenceNumber;
/**
* Default constructor.
@ -39,21 +47,16 @@ public class InitializeMessage extends Message {
*
* @param shardId The shard id.
*/
public InitializeMessage(String shardId) {
this.setShardId(shardId);
public InitializeMessage(InitializationInput initializationInput) {
this.shardId = initializationInput.getShardId();
if (initializationInput.getExtendedSequenceNumber() != null) {
this.sequenceNumber = initializationInput.getExtendedSequenceNumber().getSequenceNumber();
this.subSequenceNumber = initializationInput.getExtendedSequenceNumber().getSubSequenceNumber();
} else {
this.sequenceNumber = null;
this.subSequenceNumber = null;
}
}
/**
* @return The shard id.
*/
public String getShardId() {
return shardId;
}
/**
* @param shardId The shard id.
*/
public void setShardId(String shardId) {
this.shardId = shardId;
}
}

View file

@ -14,19 +14,29 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.model.Record;
import java.util.Date;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;
/**
* Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes
* problems for the json library we're using.
*/
@Getter
@Setter
public class JsonFriendlyRecord {
private byte[] data;
private String partitionKey;
private String sequenceNumber;
private Date approximateArrivalTimestamp;
private Long subSequenceNumber;
public static String ACTION = "record";
/**
* Default Constructor.
@ -40,109 +50,20 @@ public class JsonFriendlyRecord {
* @param record The record that this message will represent.
*/
public JsonFriendlyRecord(Record record) {
this.withData(record.getData() == null ? null : record.getData().array())
.withPartitionKey(record.getPartitionKey())
.withSequenceNumber(record.getSequenceNumber())
.withApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp());
this.data = record.getData() == null ? null : record.getData().array();
this.partitionKey = record.getPartitionKey();
this.sequenceNumber = record.getSequenceNumber();
this.approximateArrivalTimestamp = record.getApproximateArrivalTimestamp();
if (record instanceof UserRecord) {
this.subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();
} else {
this.subSequenceNumber = null;
}
}
/**
* @return The data.
*/
public byte[] getData() {
return data;
@JsonProperty
public String getAction() {
return ACTION;
}
/**
* @return The partition key.
*/
public String getPartitionKey() {
return partitionKey;
}
/**
* @return The sequence number.
*/
public String getSequenceNumber() {
return sequenceNumber;
}
/**
* @return The approximate arrival timestamp.
*/
public Date getApproximateArrivalTimestamp() {
return approximateArrivalTimestamp;
}
/**
* @param data The data.
*/
public void setData(byte[] data) {
this.data = data;
}
/**
* @param partitionKey The partition key.
*/
public void setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
}
/**
* @param sequenceNumber The sequence number.
*/
public void setSequenceNumber(String sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
/**
* @param approximateArrivalTimestamp The approximate arrival timestamp.
*/
public void setApproximateArrivalTimestamp(Date approximateArrivalTimestamp) {
this.approximateArrivalTimestamp = approximateArrivalTimestamp;
}
/**
* @param data The data.
*
* @return this
*/
public JsonFriendlyRecord withData(byte[] data) {
this.setData(data);
return this;
}
/**
* @param partitionKey The partition key.
*
* @return this
*/
public JsonFriendlyRecord withPartitionKey(String partitionKey) {
this.setPartitionKey(partitionKey);
return this;
}
/**
* @param sequenceNumber The sequence number.
*
* @return this
*/
public JsonFriendlyRecord withSequenceNumber(String sequenceNumber) {
this.setSequenceNumber(sequenceNumber);
return this;
}
/**
* @param approximateArrivalTimestamp The approximate arrival timestamp.
*
* @return this
*/
public JsonFriendlyRecord withApproximateArrivalTimestamp(Date approximateArrivalTimestamp){
this.setApproximateArrivalTimestamp(approximateArrivalTimestamp);
return this;
}
}

View file

@ -17,11 +17,16 @@ package com.amazonaws.services.kinesis.multilang.messages;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
import lombok.Getter;
import lombok.Setter;
/**
* A message to indicate to the client's process that it should process a list of records.
*/
@Getter
@Setter
public class ProcessRecordsMessage extends Message {
/**
* The name used for the action field in {@link Message}.
@ -32,6 +37,7 @@ public class ProcessRecordsMessage extends Message {
* The records that the client's process needs to handle.
*/
private List<JsonFriendlyRecord> records;
private Long millisBehindLatest;
/**
* Default constructor.
@ -42,27 +48,15 @@ public class ProcessRecordsMessage extends Message {
/**
* Convenience constructor.
*
* @param records The records.
* @param processRecordsInput
* the process records input to be sent to the child
*/
public ProcessRecordsMessage(List<Record> records) {
public ProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
this.millisBehindLatest = processRecordsInput.getMillisBehindLatest();
List<JsonFriendlyRecord> recordMessages = new ArrayList<JsonFriendlyRecord>();
for (Record record : records) {
for (Record record : processRecordsInput.getRecords()) {
recordMessages.add(new JsonFriendlyRecord(record));
}
this.setRecords(recordMessages);
}
/**
* @return The records.
*/
public List<JsonFriendlyRecord> getRecords() {
return records;
}
/**
* @param records The records.
*/
public void setRecords(List<JsonFriendlyRecord> records) {
this.records = records;
}
}

View file

@ -0,0 +1,93 @@
package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
public class Matchers {
public static Matcher<InitializationInput> withInit(InitializationInput initializationInput) {
return new InitializationInputMatcher(initializationInput);
}
public static class InitializationInputMatcher extends TypeSafeDiagnosingMatcher<InitializationInput> {
private final Matcher<String> shardIdMatcher;
private final Matcher<ExtendedSequenceNumber> sequenceNumberMatcher;
public InitializationInputMatcher(InitializationInput input) {
shardIdMatcher = equalTo(input.getShardId());
sequenceNumberMatcher = withSequence(input.getExtendedSequenceNumber());
}
@Override
protected boolean matchesSafely(final InitializationInput item, Description mismatchDescription) {
boolean matches = true;
if (!shardIdMatcher.matches(item.getShardId())) {
matches = false;
shardIdMatcher.describeMismatch(item.getShardId(), mismatchDescription);
}
if (!sequenceNumberMatcher.matches(item.getExtendedSequenceNumber())) {
matches = false;
sequenceNumberMatcher.describeMismatch(item, mismatchDescription);
}
return matches;
}
@Override
public void describeTo(Description description) {
description.appendText("An InitializationInput matching: { shardId: ").appendDescriptionOf(shardIdMatcher)
.appendText(", sequenceNumber: ").appendDescriptionOf(sequenceNumberMatcher).appendText(" }");
}
}
public static Matcher<ExtendedSequenceNumber> withSequence(ExtendedSequenceNumber extendedSequenceNumber) {
if (extendedSequenceNumber == null) {
return nullValue(ExtendedSequenceNumber.class);
}
return new ExtendedSequenceNumberMatcher(extendedSequenceNumber);
}
public static class ExtendedSequenceNumberMatcher extends TypeSafeDiagnosingMatcher<ExtendedSequenceNumber> {
private final Matcher<String> sequenceNumberMatcher;
private final Matcher<Long> subSequenceNumberMatcher;
public ExtendedSequenceNumberMatcher(ExtendedSequenceNumber extendedSequenceNumber) {
sequenceNumberMatcher = equalTo(extendedSequenceNumber.getSequenceNumber());
subSequenceNumberMatcher = equalTo(extendedSequenceNumber.getSubSequenceNumber());
}
@Override
protected boolean matchesSafely(ExtendedSequenceNumber item, Description mismatchDescription) {
boolean matches = true;
if (!sequenceNumberMatcher.matches(item.getSequenceNumber())) {
matches = false;
mismatchDescription.appendDescriptionOf(sequenceNumberMatcher);
}
if (!subSequenceNumberMatcher.matches(item.getSubSequenceNumber())) {
matches = false;
mismatchDescription.appendDescriptionOf(subSequenceNumberMatcher);
}
return matches;
}
@Override
public void describeTo(Description description) {
description.appendText("An ExtendedSequenceNumber matching: { sequenceNumber: ")
.appendDescriptionOf(sequenceNumberMatcher).appendText(", subSequenceNumber: ")
.appendDescriptionOf(subSequenceNumberMatcher);
}
}
}

View file

@ -23,6 +23,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -54,7 +56,7 @@ public class MessageWriterTest {
*/
@Test
public void writeCheckpointMessageNoErrorTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", null);
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, null);
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@ -63,7 +65,7 @@ public class MessageWriterTest {
@Test
public void writeCheckpointMessageWithErrorTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", new Throwable());
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, new Throwable());
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@ -72,7 +74,7 @@ public class MessageWriterTest {
@Test
public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeInitializeMessage(shardId);
Future<Boolean> future = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt());
@ -93,7 +95,7 @@ public class MessageWriterTest {
this.add(new Record());
}
};
Future<Boolean> future = this.messageWriter.writeProcessRecordsMessage(records);
Future<Boolean> future = this.messageWriter.writeProcessRecordsMessage(new ProcessRecordsInput().withRecords(records));
future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
@ -114,7 +116,7 @@ public class MessageWriterTest {
@Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush();
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(shardId);
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Boolean result = initializeTask.get();
Assert.assertNotNull(result);
Assert.assertFalse(result);
@ -144,7 +146,7 @@ public class MessageWriterTest {
Assert.assertFalse(this.messageWriter.isOpen());
try {
// Any message should fail
this.messageWriter.writeInitializeMessage(shardId);
this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Assert.fail("MessageWriter should be closed and unable to write.");
} catch (IllegalStateException e) {
// This should happen.

View file

@ -14,14 +14,23 @@
*/
package com.amazonaws.services.kinesis.multilang;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -33,62 +42,76 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.google.common.util.concurrent.SettableFuture;
public class MultiLangProtocolTest {
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
private MultiLangProtocol protocol;
private MessageWriter messageWriter;
private MessageReader messageReader;
private String shardId;
private IRecordProcessorCheckpointer checkpointer;
@Before
public void setup() {
this.shardId = "shard-id-123";
messageWriter = Mockito.mock(MessageWriter.class);
messageReader = Mockito.mock(MessageReader.class);
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
}
private Future<Boolean> buildBooleanFuture(boolean val) throws InterruptedException, ExecutionException {
Future<Boolean> successFuture = Mockito.mock(Future.class);
Mockito.doReturn(val).when(successFuture).get();
return successFuture;
private <T> Future<T> buildFuture(T value) {
SettableFuture<T> future = SettableFuture.create();
future.set(value);
return future;
}
private Future<Message> buildMessageFuture(Message message) throws InterruptedException, ExecutionException {
Future<Message> messageFuture = Mockito.mock(Future.class);
Mockito.doReturn(message).when(messageFuture).get();
return messageFuture;
private <T> Future<T> buildFuture(T value, Class<T> clazz) {
SettableFuture<T> future = SettableFuture.create();
future.set(value);
return future;
}
@Test
public void initializeTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeInitializeMessage(shardId);
Mockito.doReturn(buildMessageFuture(new StatusMessage("initialize"))).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertTrue(protocol.initialize());
when(messageWriter
.writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId)))))
.thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class));
assertThat(protocol.initialize(), equalTo(true));
}
@Test
public void processRecordsTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
Mockito.doReturn(buildMessageFuture(new StatusMessage("processRecords"))).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertTrue(protocol.processRecords(new ArrayList<Record>(), null));
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)), equalTo(true));
}
@Test
public void shutdownTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter)
.writeShutdownMessage(Mockito.any(ShutdownReason.class));
Mockito.doReturn(buildMessageFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertTrue(protocol.shutdown(null, ShutdownReason.ZOMBIE));
when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdown"), Message.class));
Mockito.doReturn(buildFuture(true)).when(messageWriter)
.writeShutdownMessage(any(ShutdownReason.class));
Mockito.doReturn(buildFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true));
}
private Answer<Future<Message>> buildMessageAnswers(List<Message> messages) {
@ -107,7 +130,7 @@ public class MultiLangProtocolTest {
if (this.messageIterator.hasNext()) {
message = this.messageIterator.next();
}
return buildMessageFuture(message);
return buildFuture(message);
}
}.init(messages);
@ -117,13 +140,12 @@ public class MultiLangProtocolTest {
public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException,
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter)
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
Mockito.doAnswer(buildMessageAnswers(new ArrayList<Message>() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(true));
when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
{
this.add(new CheckpointMessage("123", null));
this.add(new CheckpointMessage(null, null));
this.add(new CheckpointMessage("123", 0L, null));
this.add(new CheckpointMessage(null, 0L, null));
/*
* This procesRecords message will be ignored by the read loop which only cares about status and
* checkpoint messages. All other lines and message types are ignored. By inserting it here, we check
@ -132,24 +154,23 @@ public class MultiLangProtocolTest {
this.add(new ProcessRecordsMessage());
this.add(new StatusMessage("processRecords"));
}
})).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertTrue(protocol.processRecords(new ArrayList<Record>(), checkpointer));
}));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true));
Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint();
Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint("123");
verify(checkpointer, timeout(1)).checkpoint();
verify(checkpointer, timeout(1)).checkpoint("123", 0L);
}
@Test
public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
Mockito.doReturn(buildBooleanFuture(false)).when(messageWriter)
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
Mockito.doAnswer(buildMessageAnswers(new ArrayList<Message>() {
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
{
this.add(new CheckpointMessage("456", null));
this.add(new CheckpointMessage("456", 0L, null));
this.add(new StatusMessage("processRecords"));
}
})).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertFalse(protocol.processRecords(new ArrayList<Record>(), checkpointer));
}));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
}
}

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.multilang;
import org.junit.Assert;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
public class StreamingRecordProcessorFactoryTest {

View file

@ -14,6 +14,15 @@
*/
package com.amazonaws.services.kinesis.multilang;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -24,11 +33,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
@ -36,20 +49,27 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
@RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorTest {
private static final String shardId = "shard-123";
private int systemExitCount = 0;
@Mock
private Future<Message> messageFuture;
private IRecordProcessorCheckpointer unimplementedCheckpointer = new IRecordProcessorCheckpointer() {
@Override
@ -129,11 +149,10 @@ public class StreamingRecordProcessorTest {
Future<Boolean> trueFuture = Mockito.mock(Future.class);
Mockito.doReturn(true).when(trueFuture).get();
Mockito.doReturn(trueFuture).when(messageWriter).writeInitializeMessage(Mockito.anyString());
Mockito.doReturn(trueFuture).when(messageWriter)
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class));
Mockito.doReturn(trueFuture).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList());
Mockito.doReturn(trueFuture).when(messageWriter).writeShutdownMessage(Mockito.any(ShutdownReason.class));
when(messageWriter.writeInitializeMessage(any(InitializationInput.class))).thenReturn(trueFuture);
when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(trueFuture);
when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(trueFuture);
when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(trueFuture);
}
private void phases(Answer<StatusMessage> answer) throws InterruptedException, ExecutionException {
@ -145,16 +164,15 @@ public class StreamingRecordProcessorTest {
* processRecords
* shutdown
*/
Future<StatusMessage> future = Mockito.mock(Future.class);
Mockito.doAnswer(answer).when(future).get();
Mockito.doReturn(future).when(messageReader).getNextMessageFromSTDOUT();
when(messageFuture.get()).thenAnswer(answer);
when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture);
List<Record> testRecords = new ArrayList<Record>();
recordProcessor.initialize(shardId);
recordProcessor.processRecords(testRecords, unimplementedCheckpointer);
recordProcessor.processRecords(testRecords, unimplementedCheckpointer);
recordProcessor.shutdown(unimplementedCheckpointer, ShutdownReason.ZOMBIE);
recordProcessor.initialize(new InitializationInput().withShardId(shardId));
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE));
}
@Test
@ -180,9 +198,10 @@ public class StreamingRecordProcessorTest {
phases(answer);
Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId);
Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList());
Mockito.verify(messageWriter, Mockito.times(1)).writeShutdownMessage(ShutdownReason.ZOMBIE);
verify(messageWriter)
.writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput().withShardId(shardId))));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE);
}
@Test
@ -211,9 +230,10 @@ public class StreamingRecordProcessorTest {
phases(answer);
Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId);
Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList());
Mockito.verify(messageWriter, Mockito.times(0)).writeShutdownMessage(ShutdownReason.ZOMBIE);
verify(messageWriter).writeInitializeMessage(argThat(Matchers.withInit(new InitializationInput()
.withShardId(shardId))));
verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE);
Assert.assertEquals(1, systemExitCount);
}
}

View file

@ -17,6 +17,8 @@ package com.amazonaws.services.kinesis.multilang.messages;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert;
import org.junit.Test;
@ -30,8 +32,8 @@ public class MessageTest {
@Test
public void toStringTest() {
Message[] messages =
new Message[] { new CheckpointMessage("1234567890", null), new InitializeMessage("shard-123"),
new ProcessRecordsMessage(new ArrayList<Record>() {
new Message[] { new CheckpointMessage("1234567890", 0L, null), new InitializeMessage(new InitializationInput().withShardId("shard-123")),
new ProcessRecordsMessage(new ProcessRecordsInput().withRecords(new ArrayList<Record>() {
{
this.add(new Record() {
{
@ -41,7 +43,7 @@ public class MessageTest {
}
});
}
}), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
})), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
new InitializeMessage(), new ProcessRecordsMessage() };
for (int i = 0; i < messages.length; i++) {