Started working on bring the messages up to date.

This commit is contained in:
Pfifer, Justin 2016-09-23 11:35:39 -07:00
parent 51663f96c7
commit c777ea963a
17 changed files with 342 additions and 356 deletions

View file

@ -63,6 +63,13 @@
<version>2.6</version> <version>2.6</version>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
<!-- Test --> <!-- Test -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View file

@ -18,7 +18,6 @@ import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -26,8 +25,9 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
@ -119,19 +119,21 @@ class MessageWriter {
/** /**
* Writes an {@link InitializeMessage} to the subprocess. * Writes an {@link InitializeMessage} to the subprocess.
* *
* @param shardIdToWrite The shard id. * @param initializationInput
* contains information about the shard being initialized
*/ */
Future<Boolean> writeInitializeMessage(String shardIdToWrite) { Future<Boolean> writeInitializeMessage(InitializationInput initializationInput) {
return writeMessage(new InitializeMessage(shardIdToWrite)); return writeMessage(new InitializeMessage(initializationInput));
} }
/** /**
* Writes a {@link ProcessRecordsMessage} message to the subprocess. * Writes a {@link ProcessRecordsMessage} message to the subprocess.
* *
* @param records The records to be processed. * @param processRecordsInput
* the records, and associated metadata to be processed.
*/ */
Future<Boolean> writeProcessRecordsMessage(List<Record> records) { Future<Boolean> writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
return writeMessage(new ProcessRecordsMessage(records)); return writeMessage(new ProcessRecordsMessage(processRecordsInput));
} }
/** /**
@ -146,11 +148,16 @@ class MessageWriter {
/** /**
* Writes a {@link CheckpointMessage} to the subprocess. * Writes a {@link CheckpointMessage} to the subprocess.
* *
* @param sequenceNumber The sequence number that was checkpointed. * @param sequenceNumber
* @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur. * The sequence number that was checkpointed.
* @param subSequenceNumber
* the sub sequence number to checkpoint at.
* @param throwable
* The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
*/ */
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) { Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber,
return writeMessage(new CheckpointMessage(sequenceNumber, throwable)); Throwable throwable) {
return writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
} }
/** /**

View file

@ -24,6 +24,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
@ -71,7 +72,13 @@ public class MultiLangDaemon implements Callable<Integer> {
public MultiLangDaemon(KinesisClientLibConfiguration configuration, public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory, MultiLangRecordProcessorFactory recordProcessorFactory,
ExecutorService workerThreadPool) { ExecutorService workerThreadPool) {
this(new Worker(recordProcessorFactory, configuration, workerThreadPool)); this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
}
private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
.execService(workerThreadPool).build();
} }
/** /**

View file

@ -14,6 +14,8 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
@ -131,7 +133,7 @@ public class MultiLangDaemonConfig {
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException { private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
Properties properties = new Properties(); Properties properties = new Properties();
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) { try (InputStream propertiesStream = new FileInputStream(new File(propertiesFileName))) {
properties.load(propertiesStream); properties.load(propertiesStream);
return properties; return properties;
} }

View file

@ -14,17 +14,14 @@
*/ */
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
@ -32,28 +29,33 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import lombok.extern.apachecommons.CommonsLog;
/** /**
* An implementation of the multi language protocol. * An implementation of the multi language protocol.
*/ */
@CommonsLog
class MultiLangProtocol { class MultiLangProtocol {
private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class);
private MessageReader messageReader; private MessageReader messageReader;
private MessageWriter messageWriter; private MessageWriter messageWriter;
private String shardId; private final InitializationInput initializationInput;
/** /**
* Constructor. * Constructor.
* *
* @param messageReader A message reader. * @param messageReader
* @param messageWriter A message writer. * A message reader.
* @param shardId The shard id this processor is associated with. * @param messageWriter
* A message writer.
* @param initializationInput
* information about the shard this processor is starting to process
*/ */
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) { MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
InitializationInput initializationInput) {
this.messageReader = messageReader; this.messageReader = messageReader;
this.messageWriter = messageWriter; this.messageWriter = messageWriter;
this.shardId = shardId; this.initializationInput = initializationInput;
} }
/** /**
@ -66,7 +68,7 @@ class MultiLangProtocol {
/* /*
* Call and response to child process. * Call and response to child process.
*/ */
Future<Boolean> writeFuture = messageWriter.writeInitializeMessage(shardId); Future<Boolean> writeFuture = messageWriter.writeInitializeMessage(initializationInput);
return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture); return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture);
} }
@ -75,13 +77,13 @@ class MultiLangProtocol {
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond * Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
* with a {@link StatusMessage} on its STDOUT. * with a {@link StatusMessage} on its STDOUT.
* *
* @param records The records to process. * @param processRecordsInput
* @param checkpointer A checkpointer. * The records, and associated metadata, to process.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
boolean processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { boolean processRecords(ProcessRecordsInput processRecordsInput) {
Future<Boolean> writeFuture = messageWriter.writeProcessRecordsMessage(records); Future<Boolean> writeFuture = messageWriter.writeProcessRecordsMessage(processRecordsInput);
return waitForStatusMessage(ProcessRecordsMessage.ACTION, checkpointer, writeFuture); return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.getCheckpointer(), writeFuture);
} }
/** /**
@ -105,32 +107,41 @@ class MultiLangProtocol {
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was * checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
* able to successfully communicate the results of its attempts to checkpoint. * able to successfully communicate the results of its attempts to checkpoint.
* *
* @param action What action is being waited on. * @param action
* @param checkpointer A checkpointer. * What action is being waited on.
* @param writeFuture The writing task. * @param checkpointer
* the checkpointer from the process records, or shutdown request
* @param writeFuture
* The writing task.
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
private boolean waitForStatusMessage(String action, private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer,
IRecordProcessorCheckpointer checkpointer, Future<Boolean> writeFuture) {
Future<Boolean> writeFuture) {
boolean statusWasCorrect = waitForStatusMessage(action, checkpointer); boolean statusWasCorrect = waitForStatusMessage(action, checkpointer);
// Examine whether or not we failed somewhere along the line. // Examine whether or not we failed somewhere along the line.
try { try {
boolean writerIsStillOpen = Boolean.valueOf(writeFuture.get()); boolean writerIsStillOpen = writeFuture.get();
return statusWasCorrect && writerIsStillOpen; return statusWasCorrect && writerIsStillOpen;
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error(String.format("Interrupted while writing %s message for shard %s", action, shardId)); log.error(String.format("Interrupted while writing %s message for shard %s", action,
initializationInput.getShardId()));
return false; return false;
} catch (ExecutionException e) { } catch (ExecutionException e) {
LOG.error(String.format("Failed to write %s message for shard %s", action, shardId), e); log.error(
String.format("Failed to write %s message for shard %s", action, initializationInput.getShardId()),
e);
return false; return false;
} }
} }
/** /**
* @param action What action is being waited on. * Waits for status message and verifies it against the expectation
* @param checkpointer A checkpointer. *
* @param action
* What action is being waited on.
* @param checkpointer
* the original process records request
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
@ -141,8 +152,7 @@ class MultiLangProtocol {
Message message = future.get(); Message message = future.get();
// Note that instanceof doubles as a check against a value being null // Note that instanceof doubles as a check against a value being null
if (message instanceof CheckpointMessage) { if (message instanceof CheckpointMessage) {
boolean checkpointWriteSucceeded = boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get();
Boolean.valueOf(checkpoint((CheckpointMessage) message, checkpointer).get());
if (!checkpointWriteSucceeded) { if (!checkpointWriteSucceeded) {
return false; return false;
} }
@ -150,10 +160,12 @@ class MultiLangProtocol {
statusMessage = (StatusMessage) message; statusMessage = (StatusMessage) message;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, shardId)); log.error(String.format("Interrupted while waiting for %s message for shard %s", action,
initializationInput.getShardId()));
return false; return false;
} catch (ExecutionException e) { } catch (ExecutionException e) {
LOG.error(String.format("Failed to get status message for %s action for shard %s", action, shardId), e); log.error(String.format("Failed to get status message for %s action for shard %s", action,
initializationInput.getShardId()), e);
return false; return false;
} }
} }
@ -168,8 +180,8 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
private boolean validateStatusMessage(StatusMessage statusMessage, String action) { private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action log.info("Received response " + statusMessage + " from subprocess while waiting for " + action
+ " while processing shard " + shardId); + " while processing shard " + initializationInput.getShardId());
return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor() return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor()
.equals(action)); .equals(action));
@ -186,28 +198,38 @@ class MultiLangProtocol {
* @return Whether or not this operation succeeded. * @return Whether or not this operation succeeded.
*/ */
private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) { private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
String sequenceNumber = checkpointMessage.getCheckpoint(); String sequenceNumber = checkpointMessage.getSequenceNumber();
Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
try { try {
if (checkpointer != null) { if (checkpointer != null) {
if (sequenceNumber == null) { log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber));
LOG.info(String.format("Attempting to checkpoint for shard %s", shardId)); if (sequenceNumber != null) {
checkpointer.checkpoint(); if (subSequenceNumber != null) {
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
} else {
checkpointer.checkpoint(sequenceNumber);
}
} else { } else {
LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s", checkpointer.checkpoint();
sequenceNumber, shardId));
checkpointer.checkpoint(sequenceNumber);
} }
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null); return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
} else { } else {
String message = String message =
String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s",
sequenceNumber, shardId); sequenceNumber, initializationInput.getShardId());
LOG.error(message); log.error(message);
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException( return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber,
new InvalidStateException(
message)); message));
} }
} catch (Throwable t) { } catch (Throwable t) {
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t); return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t);
} }
} }
private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) {
return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s",
initializationInput.getShardId(), sequenceNumber, subSequenceNumber);
}
} }

View file

@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.multilang;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -24,10 +23,10 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
@ -61,56 +60,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
private MultiLangProtocol protocol; private MultiLangProtocol protocol;
/**
* Used to tell whether the processor has been shutdown already.
*/
private enum ProcessState {
ACTIVE, SHUTDOWN
}
/**
* Constructor.
*
* @param processBuilder Provides process builder functionality.
* @param executorService An executor
* @param objectMapper An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
}
/**
* Note: This constructor has package level access solely for testing purposes.
*
* @param processBuilder Provides the child process for this record processor
* @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
* @param objectMapper Object mapper
* @param messageWriter Message write to write to child process's stdin
* @param messageReader Message reader to read from child process's stdout
* @param readSTDERRTask Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder,
ExecutorService executorService,
ObjectMapper objectMapper,
MessageWriter messageWriter,
MessageReader messageReader,
DrainChildSTDERRTask readSTDERRTask) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.state = ProcessState.ACTIVE;
}
@Override @Override
public void initialize(String shardIdToProcess) { public void initialize(InitializationInput initializationInput) {
try { try {
this.shardId = shardIdToProcess; this.shardId = initializationInput.getShardId();
try { try {
this.process = startProcess(); this.process = startProcess();
} catch (IOException e) { } catch (IOException e) {
@ -129,7 +82,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
// Submit the error reader for execution // Submit the error reader for execution
stderrReadTask = executorService.submit(readSTDERRTask); stderrReadTask = executorService.submit(readSTDERRTask);
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId); protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput);
if (!protocol.initialize()) { if (!protocol.initialize()) {
throw new RuntimeException("Failed to initialize child process"); throw new RuntimeException("Failed to initialize child process");
} }
@ -142,9 +95,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
} }
@Override @Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { public void processRecords(ProcessRecordsInput processRecordsInput) {
try { try {
if (!protocol.processRecords(records, checkpointer)) { if (!protocol.processRecords(processRecordsInput)) {
throw new RuntimeException("Child process failed to process records"); throw new RuntimeException("Child process failed to process records");
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -153,7 +106,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
} }
@Override @Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { public void shutdown(ShutdownInput shutdownInput) {
// In cases where KCL loses lease for the shard after creating record processor instance but before // In cases where KCL loses lease for the shard after creating record processor instance but before
// record processor initialize() is called, then shutdown() may be called directly before initialize(). // record processor initialize() is called, then shutdown() may be called directly before initialize().
if (!initialized) { if (!initialized) {
@ -165,7 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
try { try {
if (ProcessState.ACTIVE.equals(this.state)) { if (ProcessState.ACTIVE.equals(this.state)) {
if (!protocol.shutdown(checkpointer, reason)) { if (!protocol.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason())) {
throw new RuntimeException("Child process failed to shutdown"); throw new RuntimeException("Child process failed to shutdown");
} }
@ -181,7 +134,57 @@ public class MultiLangRecordProcessor implements IRecordProcessor {
+ " but it appears the processor has already been shutdown", t); + " but it appears the processor has already been shutdown", t);
} }
} }
}
/**
* Used to tell whether the processor has been shutdown already.
*/
private enum ProcessState {
ACTIVE, SHUTDOWN
}
/**
* Constructor.
*
* @param processBuilder
* Provides process builder functionality.
* @param executorService
* An executor
* @param objectMapper
* An obejct mapper.
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
ObjectMapper objectMapper) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask());
}
/**
* Note: This constructor has package level access solely for testing purposes.
*
* @param processBuilder
* Provides the child process for this record processor
* @param executorService
* The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
* @param objectMapper
* Object mapper
* @param messageWriter
* Message write to write to child process's stdin
* @param messageReader
* Message reader to read from child process's stdout
* @param readSTDERRTask
* Error reader to read from child process's stderr
*/
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.readSTDERRTask = readSTDERRTask;
this.state = ProcessState.ACTIVE;
} }
/** /**

View file

@ -19,8 +19,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
/** /**

View file

@ -14,11 +14,16 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import lombok.Getter;
import lombok.Setter;
/** /**
* A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to * A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to
* checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along * checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along
* with an error message which corresponds to the names of exceptions that a checkpointer can throw. * with an error message which corresponds to the names of exceptions that a checkpointer can throw.
*/ */
@Getter
@Setter
public class CheckpointMessage extends Message { public class CheckpointMessage extends Message {
/** /**
* The name used for the action field in {@link Message}. * The name used for the action field in {@link Message}.
@ -28,7 +33,8 @@ public class CheckpointMessage extends Message {
/** /**
* The checkpoint this message is about. * The checkpoint this message is about.
*/ */
private String checkpoint; private String sequenceNumber;
private Long subSequenceNumber;
/** /**
* The name of an exception that occurred while attempting to checkpoint. * The name of an exception that occurred while attempting to checkpoint.
@ -44,43 +50,20 @@ public class CheckpointMessage extends Message {
/** /**
* Convenience constructor. * Convenience constructor.
* *
* @param sequenceNumber The sequence number that this message is about. * @param sequenceNumber
* @param throwable When responding to a client's process, the record processor will add the name of the exception * The sequence number that this message is about.
* that occurred while attempting to checkpoint if one did occur. * @param subSequenceNumber
* the sub sequence number for the checkpoint. This can be null.
* @param throwable
* When responding to a client's process, the record processor will add the name of the exception that
* occurred while attempting to checkpoint if one did occur.
*/ */
public CheckpointMessage(String sequenceNumber, Throwable throwable) { public CheckpointMessage(String sequenceNumber, Long subSequenceNumber, Throwable throwable) {
this.setCheckpoint(sequenceNumber); this.setSequenceNumber(sequenceNumber);
this.subSequenceNumber = subSequenceNumber;
if (throwable != null) { if (throwable != null) {
this.setError(throwable.getClass().getSimpleName()); this.setError(throwable.getClass().getSimpleName());
} }
} }
/**
* @return The checkpoint.
*/
public String getCheckpoint() {
return checkpoint;
}
/**
* @return The error.
*/
public String getError() {
return error;
}
/**
* @param checkpoint The checkpoint.
*/
public void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
}
/**
* @param error The error.
*/
public void setError(String error) {
this.error = error;
}
} }

View file

@ -14,9 +14,15 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import lombok.Getter;
import lombok.Setter;
/** /**
* An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps. * An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps.
*/ */
@Getter
@Setter
public class InitializeMessage extends Message { public class InitializeMessage extends Message {
/** /**
* The name used for the action field in {@link Message}. * The name used for the action field in {@link Message}.
@ -27,6 +33,8 @@ public class InitializeMessage extends Message {
* The shard id that this processor is getting initialized for. * The shard id that this processor is getting initialized for.
*/ */
private String shardId; private String shardId;
private String sequenceNumber;
private Long subSequenceNumber;
/** /**
* Default constructor. * Default constructor.
@ -39,21 +47,16 @@ public class InitializeMessage extends Message {
* *
* @param shardId The shard id. * @param shardId The shard id.
*/ */
public InitializeMessage(String shardId) { public InitializeMessage(InitializationInput initializationInput) {
this.setShardId(shardId); this.shardId = initializationInput.getShardId();
if (initializationInput.getExtendedSequenceNumber() != null) {
this.sequenceNumber = initializationInput.getExtendedSequenceNumber().getSequenceNumber();
this.subSequenceNumber = initializationInput.getExtendedSequenceNumber().getSubSequenceNumber();
} else {
this.sequenceNumber = null;
this.subSequenceNumber = null;
}
} }
/**
* @return The shard id.
*/
public String getShardId() {
return shardId;
}
/**
* @param shardId The shard id.
*/
public void setShardId(String shardId) {
this.shardId = shardId;
}
} }

View file

@ -14,7 +14,10 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import lombok.Getter;
import lombok.Setter;
import java.util.Date; import java.util.Date;
@ -22,11 +25,14 @@ import java.util.Date;
* Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes * Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes
* problems for the json library we're using. * problems for the json library we're using.
*/ */
@Getter
@Setter
public class JsonFriendlyRecord { public class JsonFriendlyRecord {
private byte[] data; private byte[] data;
private String partitionKey; private String partitionKey;
private String sequenceNumber; private String sequenceNumber;
private Date approximateArrivalTimestamp; private Date approximateArrivalTimestamp;
private Long subsequenceNumber;
/** /**
* Default Constructor. * Default Constructor.
@ -40,109 +46,15 @@ public class JsonFriendlyRecord {
* @param record The record that this message will represent. * @param record The record that this message will represent.
*/ */
public JsonFriendlyRecord(Record record) { public JsonFriendlyRecord(Record record) {
this.withData(record.getData() == null ? null : record.getData().array()) this.data = record.getData() == null ? null : record.getData().array();
.withPartitionKey(record.getPartitionKey()) this.partitionKey = record.getPartitionKey();
.withSequenceNumber(record.getSequenceNumber()) this.sequenceNumber = record.getSequenceNumber();
.withApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp()); this.approximateArrivalTimestamp = record.getApproximateArrivalTimestamp();
if (record instanceof UserRecord) {
this.subsequenceNumber = ((UserRecord) record).getSubSequenceNumber();
} else {
this.subsequenceNumber = null;
}
} }
/**
* @return The data.
*/
public byte[] getData() {
return data;
}
/**
* @return The partition key.
*/
public String getPartitionKey() {
return partitionKey;
}
/**
* @return The sequence number.
*/
public String getSequenceNumber() {
return sequenceNumber;
}
/**
* @return The approximate arrival timestamp.
*/
public Date getApproximateArrivalTimestamp() {
return approximateArrivalTimestamp;
}
/**
* @param data The data.
*/
public void setData(byte[] data) {
this.data = data;
}
/**
* @param partitionKey The partition key.
*/
public void setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
}
/**
* @param sequenceNumber The sequence number.
*/
public void setSequenceNumber(String sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
/**
* @param approximateArrivalTimestamp The approximate arrival timestamp.
*/
public void setApproximateArrivalTimestamp(Date approximateArrivalTimestamp) {
this.approximateArrivalTimestamp = approximateArrivalTimestamp;
}
/**
* @param data The data.
*
* @return this
*/
public JsonFriendlyRecord withData(byte[] data) {
this.setData(data);
return this;
}
/**
* @param partitionKey The partition key.
*
* @return this
*/
public JsonFriendlyRecord withPartitionKey(String partitionKey) {
this.setPartitionKey(partitionKey);
return this;
}
/**
* @param sequenceNumber The sequence number.
*
* @return this
*/
public JsonFriendlyRecord withSequenceNumber(String sequenceNumber) {
this.setSequenceNumber(sequenceNumber);
return this;
}
/**
* @param approximateArrivalTimestamp The approximate arrival timestamp.
*
* @return this
*/
public JsonFriendlyRecord withApproximateArrivalTimestamp(Date approximateArrivalTimestamp){
this.setApproximateArrivalTimestamp(approximateArrivalTimestamp);
return this;
}
} }

View file

@ -17,11 +17,16 @@ package com.amazonaws.services.kinesis.multilang.messages;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import lombok.Getter;
import lombok.Setter;
/** /**
* A message to indicate to the client's process that it should process a list of records. * A message to indicate to the client's process that it should process a list of records.
*/ */
@Getter
@Setter
public class ProcessRecordsMessage extends Message { public class ProcessRecordsMessage extends Message {
/** /**
* The name used for the action field in {@link Message}. * The name used for the action field in {@link Message}.
@ -32,6 +37,7 @@ public class ProcessRecordsMessage extends Message {
* The records that the client's process needs to handle. * The records that the client's process needs to handle.
*/ */
private List<JsonFriendlyRecord> records; private List<JsonFriendlyRecord> records;
private Long millisBehindLatest;
/** /**
* Default constructor. * Default constructor.
@ -42,27 +48,15 @@ public class ProcessRecordsMessage extends Message {
/** /**
* Convenience constructor. * Convenience constructor.
* *
* @param records The records. * @param processRecordsInput
* the process records input to be sent to the child
*/ */
public ProcessRecordsMessage(List<Record> records) { public ProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
this.millisBehindLatest = processRecordsInput.getMillisBehindLatest();
List<JsonFriendlyRecord> recordMessages = new ArrayList<JsonFriendlyRecord>(); List<JsonFriendlyRecord> recordMessages = new ArrayList<JsonFriendlyRecord>();
for (Record record : records) { for (Record record : processRecordsInput.getRecords()) {
recordMessages.add(new JsonFriendlyRecord(record)); recordMessages.add(new JsonFriendlyRecord(record));
} }
this.setRecords(recordMessages); this.setRecords(recordMessages);
} }
/**
* @return The records.
*/
public List<JsonFriendlyRecord> getRecords() {
return records;
}
/**
* @param records The records.
*/
public void setRecords(List<JsonFriendlyRecord> records) {
this.records = records;
}
} }

View file

@ -0,0 +1,7 @@
package com.amazonaws.services.kinesis.multilang;
public class Matchers {
public static class InitializationInputMatcher
}

View file

@ -23,6 +23,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -54,7 +56,7 @@ public class MessageWriterTest {
*/ */
@Test @Test
public void writeCheckpointMessageNoErrorTest() throws IOException, InterruptedException, ExecutionException { public void writeCheckpointMessageNoErrorTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", null); Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, null);
future.get(); future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt()); Mockito.anyInt());
@ -63,7 +65,7 @@ public class MessageWriterTest {
@Test @Test
public void writeCheckpointMessageWithErrorTest() throws IOException, InterruptedException, ExecutionException { public void writeCheckpointMessageWithErrorTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", new Throwable()); Future<Boolean> future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, new Throwable());
future.get(); future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt()); Mockito.anyInt());
@ -72,7 +74,7 @@ public class MessageWriterTest {
@Test @Test
public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException { public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException {
Future<Boolean> future = this.messageWriter.writeInitializeMessage(shardId); Future<Boolean> future = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
future.get(); future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
Mockito.anyInt()); Mockito.anyInt());
@ -93,7 +95,7 @@ public class MessageWriterTest {
this.add(new Record()); this.add(new Record());
} }
}; };
Future<Boolean> future = this.messageWriter.writeProcessRecordsMessage(records); Future<Boolean> future = this.messageWriter.writeProcessRecordsMessage(new ProcessRecordsInput().withRecords(records));
future.get(); future.get();
Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(),
@ -114,7 +116,7 @@ public class MessageWriterTest {
@Test @Test
public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException {
Mockito.doThrow(IOException.class).when(stream).flush(); Mockito.doThrow(IOException.class).when(stream).flush();
Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(shardId); Future<Boolean> initializeTask = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Boolean result = initializeTask.get(); Boolean result = initializeTask.get();
Assert.assertNotNull(result); Assert.assertNotNull(result);
Assert.assertFalse(result); Assert.assertFalse(result);
@ -144,7 +146,7 @@ public class MessageWriterTest {
Assert.assertFalse(this.messageWriter.isOpen()); Assert.assertFalse(this.messageWriter.isOpen());
try { try {
// Any message should fail // Any message should fail
this.messageWriter.writeInitializeMessage(shardId); this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId));
Assert.fail("MessageWriter should be closed and unable to write."); Assert.fail("MessageWriter should be closed and unable to write.");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// This should happen. // This should happen.

View file

@ -15,11 +15,15 @@
package com.amazonaws.services.kinesis.multilang; package com.amazonaws.services.kinesis.multilang;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -40,55 +44,71 @@ import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class MultiLangProtocolTest { public class MultiLangProtocolTest {
private static final List<Record> EMPTY_RECORD_LIST = Collections.emptyList();
private MultiLangProtocol protocol; private MultiLangProtocol protocol;
private MessageWriter messageWriter; private MessageWriter messageWriter;
private MessageReader messageReader; private MessageReader messageReader;
private String shardId; private String shardId;
private IRecordProcessorCheckpointer checkpointer; private IRecordProcessorCheckpointer checkpointer;
@Before @Before
public void setup() { public void setup() {
this.shardId = "shard-id-123"; this.shardId = "shard-id-123";
messageWriter = Mockito.mock(MessageWriter.class); messageWriter = Mockito.mock(MessageWriter.class);
messageReader = Mockito.mock(MessageReader.class); messageReader = Mockito.mock(MessageReader.class);
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId); protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId));
checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
} }
private Future<Boolean> buildBooleanFuture(boolean val) throws InterruptedException, ExecutionException { private <T> Future<T> buildFuture(T value) {
Future<Boolean> successFuture = Mockito.mock(Future.class); SettableFuture<T> future = SettableFuture.create();
Mockito.doReturn(val).when(successFuture).get(); future.set(value);
return successFuture; return future;
} }
private Future<Message> buildMessageFuture(Message message) throws InterruptedException, ExecutionException { private <T> Future<T> buildFuture(T value, Class<T> clazz) {
Future<Message> messageFuture = Mockito.mock(Future.class); SettableFuture<T> future = SettableFuture.create();
Mockito.doReturn(message).when(messageFuture).get(); future.set(value);
return messageFuture; return future;
} }
@Test @Test
public void initializeTest() throws InterruptedException, ExecutionException { public void initializeTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeInitializeMessage(shardId); when(messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId))).thenReturn(buildFuture(true));
Mockito.doReturn(buildMessageFuture(new StatusMessage("initialize"))).when(messageReader).getNextMessageFromSTDOUT(); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class));
Assert.assertTrue(protocol.initialize()); assertThat(protocol.initialize(), equalTo(true));
} }
@Test @Test
public void processRecordsTest() throws InterruptedException, ExecutionException { public void processRecordsTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
Mockito.doReturn(buildMessageFuture(new StatusMessage("processRecords"))).when(messageReader).getNextMessageFromSTDOUT(); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class));
Assert.assertTrue(protocol.processRecords(new ArrayList<Record>(), null));
assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)), equalTo(true));
} }
@Test @Test
public void shutdownTest() throws InterruptedException, ExecutionException { public void shutdownTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter) when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(buildFuture(true));
.writeShutdownMessage(Mockito.any(ShutdownReason.class)); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdown"), Message.class));
Mockito.doReturn(buildMessageFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
Assert.assertTrue(protocol.shutdown(null, ShutdownReason.ZOMBIE)); Mockito.doReturn(buildFuture(true)).when(messageWriter)
.writeShutdownMessage(any(ShutdownReason.class));
Mockito.doReturn(buildFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT();
assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true));
} }
private Answer<Future<Message>> buildMessageAnswers(List<Message> messages) { private Answer<Future<Message>> buildMessageAnswers(List<Message> messages) {
@ -107,7 +127,7 @@ public class MultiLangProtocolTest {
if (this.messageIterator.hasNext()) { if (this.messageIterator.hasNext()) {
message = this.messageIterator.next(); message = this.messageIterator.next();
} }
return buildMessageFuture(message); return buildFuture(message);
} }
}.init(messages); }.init(messages);
@ -117,13 +137,12 @@ public class MultiLangProtocolTest {
public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException, public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException,
KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter) when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(true));
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
Mockito.doAnswer(buildMessageAnswers(new ArrayList<Message>() {
{ {
this.add(new CheckpointMessage("123", null)); this.add(new CheckpointMessage("123", 0L, null));
this.add(new CheckpointMessage(null, null)); this.add(new CheckpointMessage(null, 0L, null));
/* /*
* This procesRecords message will be ignored by the read loop which only cares about status and * This procesRecords message will be ignored by the read loop which only cares about status and
* checkpoint messages. All other lines and message types are ignored. By inserting it here, we check * checkpoint messages. All other lines and message types are ignored. By inserting it here, we check
@ -132,24 +151,23 @@ public class MultiLangProtocolTest {
this.add(new ProcessRecordsMessage()); this.add(new ProcessRecordsMessage());
this.add(new StatusMessage("processRecords")); this.add(new StatusMessage("processRecords"));
} }
})).when(messageReader).getNextMessageFromSTDOUT(); }));
Assert.assertTrue(protocol.processRecords(new ArrayList<Record>(), checkpointer)); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true));
Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint(); verify(checkpointer, timeout(1)).checkpoint();
Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint("123"); verify(checkpointer, timeout(1)).checkpoint("123", 0L);
} }
@Test @Test
public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException { public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException {
Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true));
Mockito.doReturn(buildBooleanFuture(false)).when(messageWriter) when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false));
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList<Message>() {
Mockito.doAnswer(buildMessageAnswers(new ArrayList<Message>() {
{ {
this.add(new CheckpointMessage("456", null)); this.add(new CheckpointMessage("456", 0L, null));
this.add(new StatusMessage("processRecords")); this.add(new StatusMessage("processRecords"));
} }
})).when(messageReader).getNextMessageFromSTDOUT(); }));
Assert.assertFalse(protocol.processRecords(new ArrayList<Record>(), checkpointer)); assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false));
} }
} }

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.multilang;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
public class StreamingRecordProcessorFactoryTest { public class StreamingRecordProcessorFactoryTest {

View file

@ -24,11 +24,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
@ -44,12 +51,24 @@ import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StreamingRecordProcessorTest { public class StreamingRecordProcessorTest {
private static final String shardId = "shard-123"; private static final String shardId = "shard-123";
private int systemExitCount = 0; private int systemExitCount = 0;
@Mock
private Future<Message> messageFuture;
private IRecordProcessorCheckpointer unimplementedCheckpointer = new IRecordProcessorCheckpointer() { private IRecordProcessorCheckpointer unimplementedCheckpointer = new IRecordProcessorCheckpointer() {
@Override @Override
@ -129,11 +148,10 @@ public class StreamingRecordProcessorTest {
Future<Boolean> trueFuture = Mockito.mock(Future.class); Future<Boolean> trueFuture = Mockito.mock(Future.class);
Mockito.doReturn(true).when(trueFuture).get(); Mockito.doReturn(true).when(trueFuture).get();
Mockito.doReturn(trueFuture).when(messageWriter).writeInitializeMessage(Mockito.anyString()); when(messageWriter.writeInitializeMessage(any(InitializationInput.class))).thenReturn(trueFuture);
Mockito.doReturn(trueFuture).when(messageWriter) when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(trueFuture);
.writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(trueFuture);
Mockito.doReturn(trueFuture).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(trueFuture);
Mockito.doReturn(trueFuture).when(messageWriter).writeShutdownMessage(Mockito.any(ShutdownReason.class));
} }
private void phases(Answer<StatusMessage> answer) throws InterruptedException, ExecutionException { private void phases(Answer<StatusMessage> answer) throws InterruptedException, ExecutionException {
@ -145,16 +163,15 @@ public class StreamingRecordProcessorTest {
* processRecords * processRecords
* shutdown * shutdown
*/ */
Future<StatusMessage> future = Mockito.mock(Future.class); when(messageFuture.get()).thenAnswer(answer);
Mockito.doAnswer(answer).when(future).get(); when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture);
Mockito.doReturn(future).when(messageReader).getNextMessageFromSTDOUT();
List<Record> testRecords = new ArrayList<Record>(); List<Record> testRecords = new ArrayList<Record>();
recordProcessor.initialize(shardId); recordProcessor.initialize(new InitializationInput().withShardId(shardId));
recordProcessor.processRecords(testRecords, unimplementedCheckpointer); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
recordProcessor.processRecords(testRecords, unimplementedCheckpointer); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
recordProcessor.shutdown(unimplementedCheckpointer, ShutdownReason.ZOMBIE); recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE));
} }
@Test @Test
@ -180,9 +197,9 @@ public class StreamingRecordProcessorTest {
phases(answer); phases(answer);
Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId); verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId));
Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList()); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
Mockito.verify(messageWriter, Mockito.times(1)).writeShutdownMessage(ShutdownReason.ZOMBIE); verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE);
} }
@Test @Test
@ -211,9 +228,9 @@ public class StreamingRecordProcessorTest {
phases(answer); phases(answer);
Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId); verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId));
Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList()); verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class));
Mockito.verify(messageWriter, Mockito.times(0)).writeShutdownMessage(ShutdownReason.ZOMBIE); verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE);
Assert.assertEquals(1, systemExitCount); Assert.assertEquals(1, systemExitCount);
} }
} }

View file

@ -18,6 +18,8 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -31,8 +33,8 @@ public class MessageTest {
@Test @Test
public void toStringTest() { public void toStringTest() {
Message[] messages = Message[] messages =
new Message[] { new CheckpointMessage("1234567890", null), new InitializeMessage("shard-123"), new Message[] { new CheckpointMessage("1234567890", 0L, null), new InitializeMessage(new InitializationInput().withShardId("shard-123")),
new ProcessRecordsMessage(new ArrayList<Record>() { new ProcessRecordsMessage(new ProcessRecordsInput().withRecords(new ArrayList<Record>() {
{ {
this.add(new Record() { this.add(new Record() {
{ {
@ -42,7 +44,7 @@ public class MessageTest {
} }
}); });
} }
}), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"),
new InitializeMessage(), new ProcessRecordsMessage() }; new InitializeMessage(), new ProcessRecordsMessage() };
for (int i = 0; i < messages.length; i++) { for (int i = 0; i < messages.length; i++) {