From c777ea963ae57bc9596ba7caabdc605c5ea8ae19 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Fri, 23 Sep 2016 11:35:39 -0700 Subject: [PATCH] Started working on bring the messages up to date. --- pom.xml | 7 ++ .../kinesis/multilang/MessageWriter.java | 31 +++-- .../kinesis/multilang/MultiLangDaemon.java | 9 +- .../multilang/MultiLangDaemonConfig.java | 4 +- .../kinesis/multilang/MultiLangProtocol.java | 118 ++++++++++------- .../multilang/MultiLangRecordProcessor.java | 119 +++++++++--------- .../MultiLangRecordProcessorFactory.java | 4 +- .../multilang/messages/CheckpointMessage.java | 51 +++----- .../multilang/messages/InitializeMessage.java | 33 ++--- .../messages/JsonFriendlyRecord.java | 118 +++-------------- .../messages/ProcessRecordsMessage.java | 28 ++--- .../services/kinesis/multilang/Matchers.java | 7 ++ .../kinesis/multilang/MessageWriterTest.java | 14 ++- .../multilang/MultiLangProtocolTest.java | 92 ++++++++------ .../StreamingRecordProcessorFactoryTest.java | 2 +- .../StreamingRecordProcessorTest.java | 53 +++++--- .../multilang/messages/MessageTest.java | 8 +- 17 files changed, 342 insertions(+), 356 deletions(-) create mode 100644 src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java diff --git a/pom.xml b/pom.xml index a68997cc..7ef2a526 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,13 @@ 2.6 + + org.projectlombok + lombok + 1.16.10 + provided + + junit diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index d4ef9b20..06761b2e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -18,7 +18,6 @@ import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -26,8 +25,9 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; -import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; @@ -119,19 +119,21 @@ class MessageWriter { /** * Writes an {@link InitializeMessage} to the subprocess. * - * @param shardIdToWrite The shard id. + * @param initializationInput + * contains information about the shard being initialized */ - Future writeInitializeMessage(String shardIdToWrite) { - return writeMessage(new InitializeMessage(shardIdToWrite)); + Future writeInitializeMessage(InitializationInput initializationInput) { + return writeMessage(new InitializeMessage(initializationInput)); } /** * Writes a {@link ProcessRecordsMessage} message to the subprocess. * - * @param records The records to be processed. + * @param processRecordsInput + * the records, and associated metadata to be processed. */ - Future writeProcessRecordsMessage(List records) { - return writeMessage(new ProcessRecordsMessage(records)); + Future writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) { + return writeMessage(new ProcessRecordsMessage(processRecordsInput)); } /** @@ -146,11 +148,16 @@ class MessageWriter { /** * Writes a {@link CheckpointMessage} to the subprocess. * - * @param sequenceNumber The sequence number that was checkpointed. - * @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur. + * @param sequenceNumber + * The sequence number that was checkpointed. + * @param subSequenceNumber + * the sub sequence number to checkpoint at. + * @param throwable + * The exception that was thrown by a checkpoint attempt. Null if one didn't occur. */ - Future writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) { - return writeMessage(new CheckpointMessage(sequenceNumber, throwable)); + Future writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber, + Throwable throwable) { + return writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable)); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 8b74cabc..fdff4dc7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; @@ -71,7 +72,13 @@ public class MultiLangDaemon implements Callable { public MultiLangDaemon(KinesisClientLibConfiguration configuration, MultiLangRecordProcessorFactory recordProcessorFactory, ExecutorService workerThreadPool) { - this(new Worker(recordProcessorFactory, configuration, workerThreadPool)); + this(buildWorker(recordProcessorFactory, configuration, workerThreadPool)); + } + + private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory, + KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) { + return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration) + .execService(workerThreadPool).build(); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java index 7793f12b..82eb5f77 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.multilang; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; @@ -131,7 +133,7 @@ public class MultiLangDaemonConfig { private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException { Properties properties = new Properties(); - try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) { + try (InputStream propertiesStream = new FileInputStream(new File(propertiesFileName))) { properties.load(propertiesStream); return properties; } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 4b15a532..9f159d30 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -14,17 +14,14 @@ */ package com.amazonaws.services.kinesis.multilang; -import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; -import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; @@ -32,28 +29,33 @@ import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; +import lombok.extern.apachecommons.CommonsLog; + /** * An implementation of the multi language protocol. */ +@CommonsLog class MultiLangProtocol { - private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class); - private MessageReader messageReader; private MessageWriter messageWriter; - private String shardId; + private final InitializationInput initializationInput; /** * Constructor. * - * @param messageReader A message reader. - * @param messageWriter A message writer. - * @param shardId The shard id this processor is associated with. + * @param messageReader + * A message reader. + * @param messageWriter + * A message writer. + * @param initializationInput + * information about the shard this processor is starting to process */ - MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) { + MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, + InitializationInput initializationInput) { this.messageReader = messageReader; this.messageWriter = messageWriter; - this.shardId = shardId; + this.initializationInput = initializationInput; } /** @@ -66,7 +68,7 @@ class MultiLangProtocol { /* * Call and response to child process. */ - Future writeFuture = messageWriter.writeInitializeMessage(shardId); + Future writeFuture = messageWriter.writeInitializeMessage(initializationInput); return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture); } @@ -75,13 +77,13 @@ class MultiLangProtocol { * Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond * with a {@link StatusMessage} on its STDOUT. * - * @param records The records to process. - * @param checkpointer A checkpointer. + * @param processRecordsInput + * The records, and associated metadata, to process. * @return Whether or not this operation succeeded. */ - boolean processRecords(List records, IRecordProcessorCheckpointer checkpointer) { - Future writeFuture = messageWriter.writeProcessRecordsMessage(records); - return waitForStatusMessage(ProcessRecordsMessage.ACTION, checkpointer, writeFuture); + boolean processRecords(ProcessRecordsInput processRecordsInput) { + Future writeFuture = messageWriter.writeProcessRecordsMessage(processRecordsInput); + return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.getCheckpointer(), writeFuture); } /** @@ -105,32 +107,41 @@ class MultiLangProtocol { * checkpointing itself was successful is not the concern of this method. This method simply cares whether it was * able to successfully communicate the results of its attempts to checkpoint. * - * @param action What action is being waited on. - * @param checkpointer A checkpointer. - * @param writeFuture The writing task. + * @param action + * What action is being waited on. + * @param checkpointer + * the checkpointer from the process records, or shutdown request + * @param writeFuture + * The writing task. * @return Whether or not this operation succeeded. */ - private boolean waitForStatusMessage(String action, - IRecordProcessorCheckpointer checkpointer, - Future writeFuture) { + private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer, + Future writeFuture) { boolean statusWasCorrect = waitForStatusMessage(action, checkpointer); // Examine whether or not we failed somewhere along the line. try { - boolean writerIsStillOpen = Boolean.valueOf(writeFuture.get()); + boolean writerIsStillOpen = writeFuture.get(); return statusWasCorrect && writerIsStillOpen; } catch (InterruptedException e) { - LOG.error(String.format("Interrupted while writing %s message for shard %s", action, shardId)); + log.error(String.format("Interrupted while writing %s message for shard %s", action, + initializationInput.getShardId())); return false; } catch (ExecutionException e) { - LOG.error(String.format("Failed to write %s message for shard %s", action, shardId), e); + log.error( + String.format("Failed to write %s message for shard %s", action, initializationInput.getShardId()), + e); return false; } } /** - * @param action What action is being waited on. - * @param checkpointer A checkpointer. + * Waits for status message and verifies it against the expectation + * + * @param action + * What action is being waited on. + * @param checkpointer + * the original process records request * @return Whether or not this operation succeeded. */ private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) { @@ -141,8 +152,7 @@ class MultiLangProtocol { Message message = future.get(); // Note that instanceof doubles as a check against a value being null if (message instanceof CheckpointMessage) { - boolean checkpointWriteSucceeded = - Boolean.valueOf(checkpoint((CheckpointMessage) message, checkpointer).get()); + boolean checkpointWriteSucceeded = checkpoint((CheckpointMessage) message, checkpointer).get(); if (!checkpointWriteSucceeded) { return false; } @@ -150,10 +160,12 @@ class MultiLangProtocol { statusMessage = (StatusMessage) message; } } catch (InterruptedException e) { - LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, shardId)); + log.error(String.format("Interrupted while waiting for %s message for shard %s", action, + initializationInput.getShardId())); return false; } catch (ExecutionException e) { - LOG.error(String.format("Failed to get status message for %s action for shard %s", action, shardId), e); + log.error(String.format("Failed to get status message for %s action for shard %s", action, + initializationInput.getShardId()), e); return false; } } @@ -168,8 +180,8 @@ class MultiLangProtocol { * @return Whether or not this operation succeeded. */ private boolean validateStatusMessage(StatusMessage statusMessage, String action) { - LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action - + " while processing shard " + shardId); + log.info("Received response " + statusMessage + " from subprocess while waiting for " + action + + " while processing shard " + initializationInput.getShardId()); return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor() .equals(action)); @@ -186,28 +198,38 @@ class MultiLangProtocol { * @return Whether or not this operation succeeded. */ private Future checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) { - String sequenceNumber = checkpointMessage.getCheckpoint(); + String sequenceNumber = checkpointMessage.getSequenceNumber(); + Long subSequenceNumber = checkpointMessage.getSubSequenceNumber(); try { if (checkpointer != null) { - if (sequenceNumber == null) { - LOG.info(String.format("Attempting to checkpoint for shard %s", shardId)); - checkpointer.checkpoint(); + log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber)); + if (sequenceNumber != null) { + if (subSequenceNumber != null) { + checkpointer.checkpoint(sequenceNumber, subSequenceNumber); + } else { + checkpointer.checkpoint(sequenceNumber); + } } else { - LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s", - sequenceNumber, shardId)); - checkpointer.checkpoint(sequenceNumber); + checkpointer.checkpoint(); } - return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null); + return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null); } else { String message = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", - sequenceNumber, shardId); - LOG.error(message); - return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException( + sequenceNumber, initializationInput.getShardId()); + log.error(message); + return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, + new InvalidStateException( message)); } } catch (Throwable t) { - return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t); + return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, t); } } + + private String logCheckpointMessage(String sequenceNumber, Long subSequenceNumber) { + return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s", + initializationInput.getShardId(), sequenceNumber, subSequenceNumber); + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index a33e8ebf..9d76af54 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.multilang; import java.io.IOException; import java.io.InputStream; -import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -24,10 +23,10 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; -import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.fasterxml.jackson.databind.ObjectMapper; /** @@ -61,56 +60,10 @@ public class MultiLangRecordProcessor implements IRecordProcessor { private MultiLangProtocol protocol; - /** - * Used to tell whether the processor has been shutdown already. - */ - private enum ProcessState { - ACTIVE, SHUTDOWN - } - - /** - * Constructor. - * - * @param processBuilder Provides process builder functionality. - * @param executorService An executor - * @param objectMapper An obejct mapper. - */ - MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, - ObjectMapper objectMapper) { - this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), - new DrainChildSTDERRTask()); - } - - /** - * Note: This constructor has package level access solely for testing purposes. - * - * @param processBuilder Provides the child process for this record processor - * @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory} - * @param objectMapper Object mapper - * @param messageWriter Message write to write to child process's stdin - * @param messageReader Message reader to read from child process's stdout - * @param readSTDERRTask Error reader to read from child process's stderr - */ - MultiLangRecordProcessor(ProcessBuilder processBuilder, - ExecutorService executorService, - ObjectMapper objectMapper, - MessageWriter messageWriter, - MessageReader messageReader, - DrainChildSTDERRTask readSTDERRTask) { - this.executorService = executorService; - this.processBuilder = processBuilder; - this.objectMapper = objectMapper; - this.messageWriter = messageWriter; - this.messageReader = messageReader; - this.readSTDERRTask = readSTDERRTask; - - this.state = ProcessState.ACTIVE; - } - @Override - public void initialize(String shardIdToProcess) { + public void initialize(InitializationInput initializationInput) { try { - this.shardId = shardIdToProcess; + this.shardId = initializationInput.getShardId(); try { this.process = startProcess(); } catch (IOException e) { @@ -129,7 +82,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor { // Submit the error reader for execution stderrReadTask = executorService.submit(readSTDERRTask); - protocol = new MultiLangProtocol(messageReader, messageWriter, shardId); + protocol = new MultiLangProtocol(messageReader, messageWriter, initializationInput); if (!protocol.initialize()) { throw new RuntimeException("Failed to initialize child process"); } @@ -142,9 +95,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } @Override - public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { + public void processRecords(ProcessRecordsInput processRecordsInput) { try { - if (!protocol.processRecords(records, checkpointer)) { + if (!protocol.processRecords(processRecordsInput)) { throw new RuntimeException("Child process failed to process records"); } } catch (Throwable t) { @@ -153,7 +106,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor { } @Override - public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { + public void shutdown(ShutdownInput shutdownInput) { // In cases where KCL loses lease for the shard after creating record processor instance but before // record processor initialize() is called, then shutdown() may be called directly before initialize(). if (!initialized) { @@ -165,7 +118,7 @@ public class MultiLangRecordProcessor implements IRecordProcessor { try { if (ProcessState.ACTIVE.equals(this.state)) { - if (!protocol.shutdown(checkpointer, reason)) { + if (!protocol.shutdown(shutdownInput.getCheckpointer(), shutdownInput.getShutdownReason())) { throw new RuntimeException("Child process failed to shutdown"); } @@ -181,7 +134,57 @@ public class MultiLangRecordProcessor implements IRecordProcessor { + " but it appears the processor has already been shutdown", t); } } + } + /** + * Used to tell whether the processor has been shutdown already. + */ + private enum ProcessState { + ACTIVE, SHUTDOWN + } + + /** + * Constructor. + * + * @param processBuilder + * Provides process builder functionality. + * @param executorService + * An executor + * @param objectMapper + * An obejct mapper. + */ + MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, + ObjectMapper objectMapper) { + this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), + new DrainChildSTDERRTask()); + } + + /** + * Note: This constructor has package level access solely for testing purposes. + * + * @param processBuilder + * Provides the child process for this record processor + * @param executorService + * The executor service which is provided by the {@link MultiLangRecordProcessorFactory} + * @param objectMapper + * Object mapper + * @param messageWriter + * Message write to write to child process's stdin + * @param messageReader + * Message reader to read from child process's stdout + * @param readSTDERRTask + * Error reader to read from child process's stderr + */ + MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, + MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) { + this.executorService = executorService; + this.processBuilder = processBuilder; + this.objectMapper = objectMapper; + this.messageWriter = messageWriter; + this.messageReader = messageReader; + this.readSTDERRTask = readSTDERRTask; + + this.state = ProcessState.ACTIVE; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java index e8ed7eb7..e55217a6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java @@ -19,8 +19,8 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.fasterxml.jackson.databind.ObjectMapper; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java index 8d4b0ead..5cdc02bd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/CheckpointMessage.java @@ -14,11 +14,16 @@ */ package com.amazonaws.services.kinesis.multilang.messages; +import lombok.Getter; +import lombok.Setter; + /** * A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to * checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along * with an error message which corresponds to the names of exceptions that a checkpointer can throw. */ +@Getter +@Setter public class CheckpointMessage extends Message { /** * The name used for the action field in {@link Message}. @@ -28,7 +33,8 @@ public class CheckpointMessage extends Message { /** * The checkpoint this message is about. */ - private String checkpoint; + private String sequenceNumber; + private Long subSequenceNumber; /** * The name of an exception that occurred while attempting to checkpoint. @@ -44,43 +50,20 @@ public class CheckpointMessage extends Message { /** * Convenience constructor. * - * @param sequenceNumber The sequence number that this message is about. - * @param throwable When responding to a client's process, the record processor will add the name of the exception - * that occurred while attempting to checkpoint if one did occur. + * @param sequenceNumber + * The sequence number that this message is about. + * @param subSequenceNumber + * the sub sequence number for the checkpoint. This can be null. + * @param throwable + * When responding to a client's process, the record processor will add the name of the exception that + * occurred while attempting to checkpoint if one did occur. */ - public CheckpointMessage(String sequenceNumber, Throwable throwable) { - this.setCheckpoint(sequenceNumber); + public CheckpointMessage(String sequenceNumber, Long subSequenceNumber, Throwable throwable) { + this.setSequenceNumber(sequenceNumber); + this.subSequenceNumber = subSequenceNumber; if (throwable != null) { this.setError(throwable.getClass().getSimpleName()); } } - /** - * @return The checkpoint. - */ - public String getCheckpoint() { - return checkpoint; - } - - /** - * @return The error. - */ - public String getError() { - return error; - } - - /** - * @param checkpoint The checkpoint. - */ - public void setCheckpoint(String checkpoint) { - this.checkpoint = checkpoint; - } - - /** - * @param error The error. - */ - public void setError(String error) { - this.error = error; - } - } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java index de30cde9..3795e57e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java @@ -14,9 +14,15 @@ */ package com.amazonaws.services.kinesis.multilang.messages; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import lombok.Getter; +import lombok.Setter; + /** * An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps. */ +@Getter +@Setter public class InitializeMessage extends Message { /** * The name used for the action field in {@link Message}. @@ -27,6 +33,8 @@ public class InitializeMessage extends Message { * The shard id that this processor is getting initialized for. */ private String shardId; + private String sequenceNumber; + private Long subSequenceNumber; /** * Default constructor. @@ -39,21 +47,16 @@ public class InitializeMessage extends Message { * * @param shardId The shard id. */ - public InitializeMessage(String shardId) { - this.setShardId(shardId); + public InitializeMessage(InitializationInput initializationInput) { + this.shardId = initializationInput.getShardId(); + if (initializationInput.getExtendedSequenceNumber() != null) { + this.sequenceNumber = initializationInput.getExtendedSequenceNumber().getSequenceNumber(); + this.subSequenceNumber = initializationInput.getExtendedSequenceNumber().getSubSequenceNumber(); + } else { + this.sequenceNumber = null; + this.subSequenceNumber = null; + } + } - /** - * @return The shard id. - */ - public String getShardId() { - return shardId; - } - - /** - * @param shardId The shard id. - */ - public void setShardId(String shardId) { - this.shardId = shardId; - } } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java index b86df688..2e898fad 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/JsonFriendlyRecord.java @@ -14,7 +14,10 @@ */ package com.amazonaws.services.kinesis.multilang.messages; +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.model.Record; +import lombok.Getter; +import lombok.Setter; import java.util.Date; @@ -22,11 +25,14 @@ import java.util.Date; * Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes * problems for the json library we're using. */ +@Getter +@Setter public class JsonFriendlyRecord { private byte[] data; private String partitionKey; private String sequenceNumber; private Date approximateArrivalTimestamp; + private Long subsequenceNumber; /** * Default Constructor. @@ -40,109 +46,15 @@ public class JsonFriendlyRecord { * @param record The record that this message will represent. */ public JsonFriendlyRecord(Record record) { - this.withData(record.getData() == null ? null : record.getData().array()) - .withPartitionKey(record.getPartitionKey()) - .withSequenceNumber(record.getSequenceNumber()) - .withApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp()); + this.data = record.getData() == null ? null : record.getData().array(); + this.partitionKey = record.getPartitionKey(); + this.sequenceNumber = record.getSequenceNumber(); + this.approximateArrivalTimestamp = record.getApproximateArrivalTimestamp(); + if (record instanceof UserRecord) { + this.subsequenceNumber = ((UserRecord) record).getSubSequenceNumber(); + } else { + this.subsequenceNumber = null; + } } - /** - * @return The data. - */ - public byte[] getData() { - return data; - } - - /** - * @return The partition key. - */ - public String getPartitionKey() { - return partitionKey; - } - - /** - * @return The sequence number. - */ - public String getSequenceNumber() { - return sequenceNumber; - } - - /** - * @return The approximate arrival timestamp. - */ - public Date getApproximateArrivalTimestamp() { - return approximateArrivalTimestamp; - } - - /** - * @param data The data. - */ - public void setData(byte[] data) { - this.data = data; - } - - /** - * @param partitionKey The partition key. - */ - public void setPartitionKey(String partitionKey) { - this.partitionKey = partitionKey; - } - - /** - * @param sequenceNumber The sequence number. - */ - public void setSequenceNumber(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; - } - - /** - * @param approximateArrivalTimestamp The approximate arrival timestamp. - */ - public void setApproximateArrivalTimestamp(Date approximateArrivalTimestamp) { - this.approximateArrivalTimestamp = approximateArrivalTimestamp; - } - - /** - * @param data The data. - * - * @return this - */ - public JsonFriendlyRecord withData(byte[] data) { - this.setData(data); - return this; - } - - /** - * @param partitionKey The partition key. - * - * @return this - */ - public JsonFriendlyRecord withPartitionKey(String partitionKey) { - this.setPartitionKey(partitionKey); - return this; - } - - /** - * @param sequenceNumber The sequence number. - * - * @return this - */ - public JsonFriendlyRecord withSequenceNumber(String sequenceNumber) { - this.setSequenceNumber(sequenceNumber); - return this; - } - - /** - * @param approximateArrivalTimestamp The approximate arrival timestamp. - * - * @return this - */ - public JsonFriendlyRecord withApproximateArrivalTimestamp(Date approximateArrivalTimestamp){ - this.setApproximateArrivalTimestamp(approximateArrivalTimestamp); - return this; - } - - - - } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java index 1d0a6667..9e382b93 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ProcessRecordsMessage.java @@ -17,11 +17,16 @@ package com.amazonaws.services.kinesis.multilang.messages; import java.util.ArrayList; import java.util.List; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.Record; +import lombok.Getter; +import lombok.Setter; /** * A message to indicate to the client's process that it should process a list of records. */ +@Getter +@Setter public class ProcessRecordsMessage extends Message { /** * The name used for the action field in {@link Message}. @@ -32,6 +37,7 @@ public class ProcessRecordsMessage extends Message { * The records that the client's process needs to handle. */ private List records; + private Long millisBehindLatest; /** * Default constructor. @@ -42,27 +48,15 @@ public class ProcessRecordsMessage extends Message { /** * Convenience constructor. * - * @param records The records. + * @param processRecordsInput + * the process records input to be sent to the child */ - public ProcessRecordsMessage(List records) { + public ProcessRecordsMessage(ProcessRecordsInput processRecordsInput) { + this.millisBehindLatest = processRecordsInput.getMillisBehindLatest(); List recordMessages = new ArrayList(); - for (Record record : records) { + for (Record record : processRecordsInput.getRecords()) { recordMessages.add(new JsonFriendlyRecord(record)); } this.setRecords(recordMessages); } - - /** - * @return The records. - */ - public List getRecords() { - return records; - } - - /** - * @param records The records. - */ - public void setRecords(List records) { - this.records = records; - } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java new file mode 100644 index 00000000..3fe7b78a --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/Matchers.java @@ -0,0 +1,7 @@ +package com.amazonaws.services.kinesis.multilang; + +public class Matchers { + + public static class InitializationInputMatcher + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 9461b1cc..e96ac9f4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,7 +56,7 @@ public class MessageWriterTest { */ @Test public void writeCheckpointMessageNoErrorTest() throws IOException, InterruptedException, ExecutionException { - Future future = this.messageWriter.writeCheckpointMessageWithError("1234", null); + Future future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, null); future.get(); Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); @@ -63,7 +65,7 @@ public class MessageWriterTest { @Test public void writeCheckpointMessageWithErrorTest() throws IOException, InterruptedException, ExecutionException { - Future future = this.messageWriter.writeCheckpointMessageWithError("1234", new Throwable()); + Future future = this.messageWriter.writeCheckpointMessageWithError("1234", 0L, new Throwable()); future.get(); Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); @@ -72,7 +74,7 @@ public class MessageWriterTest { @Test public void writeInitializeMessageTest() throws IOException, InterruptedException, ExecutionException { - Future future = this.messageWriter.writeInitializeMessage(shardId); + Future future = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId)); future.get(); Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); @@ -93,7 +95,7 @@ public class MessageWriterTest { this.add(new Record()); } }; - Future future = this.messageWriter.writeProcessRecordsMessage(records); + Future future = this.messageWriter.writeProcessRecordsMessage(new ProcessRecordsInput().withRecords(records)); future.get(); Mockito.verify(this.stream, Mockito.atLeastOnce()).write(Mockito.any(byte[].class), Mockito.anyInt(), @@ -114,7 +116,7 @@ public class MessageWriterTest { @Test public void streamIOExceptionTest() throws IOException, InterruptedException, ExecutionException { Mockito.doThrow(IOException.class).when(stream).flush(); - Future initializeTask = this.messageWriter.writeInitializeMessage(shardId); + Future initializeTask = this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId)); Boolean result = initializeTask.get(); Assert.assertNotNull(result); Assert.assertFalse(result); @@ -144,7 +146,7 @@ public class MessageWriterTest { Assert.assertFalse(this.messageWriter.isOpen()); try { // Any message should fail - this.messageWriter.writeInitializeMessage(shardId); + this.messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId)); Assert.fail("MessageWriter should be closed and unable to write."); } catch (IllegalStateException e) { // This should happen. diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 55eb5dce..8093f815 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -15,11 +15,15 @@ package com.amazonaws.services.kinesis.multilang; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; import org.junit.Before; @@ -40,55 +44,71 @@ import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class MultiLangProtocolTest { + private static final List EMPTY_RECORD_LIST = Collections.emptyList(); private MultiLangProtocol protocol; private MessageWriter messageWriter; private MessageReader messageReader; private String shardId; private IRecordProcessorCheckpointer checkpointer; + + @Before public void setup() { this.shardId = "shard-id-123"; messageWriter = Mockito.mock(MessageWriter.class); messageReader = Mockito.mock(MessageReader.class); - protocol = new MultiLangProtocol(messageReader, messageWriter, shardId); + protocol = new MultiLangProtocol(messageReader, messageWriter, new InitializationInput().withShardId(shardId)); checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); } - private Future buildBooleanFuture(boolean val) throws InterruptedException, ExecutionException { - Future successFuture = Mockito.mock(Future.class); - Mockito.doReturn(val).when(successFuture).get(); - return successFuture; + private Future buildFuture(T value) { + SettableFuture future = SettableFuture.create(); + future.set(value); + return future; } - private Future buildMessageFuture(Message message) throws InterruptedException, ExecutionException { - Future messageFuture = Mockito.mock(Future.class); - Mockito.doReturn(message).when(messageFuture).get(); - return messageFuture; + private Future buildFuture(T value, Class clazz) { + SettableFuture future = SettableFuture.create(); + future.set(value); + return future; } @Test public void initializeTest() throws InterruptedException, ExecutionException { - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeInitializeMessage(shardId); - Mockito.doReturn(buildMessageFuture(new StatusMessage("initialize"))).when(messageReader).getNextMessageFromSTDOUT(); - Assert.assertTrue(protocol.initialize()); + when(messageWriter.writeInitializeMessage(new InitializationInput().withShardId(shardId))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("initialize"), Message.class)); + assertThat(protocol.initialize(), equalTo(true)); } @Test public void processRecordsTest() throws InterruptedException, ExecutionException { - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.doReturn(buildMessageFuture(new StatusMessage("processRecords"))).when(messageReader).getNextMessageFromSTDOUT(); - Assert.assertTrue(protocol.processRecords(new ArrayList(), null)); + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("processRecords"), Message.class)); + + assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST)), equalTo(true)); } @Test public void shutdownTest() throws InterruptedException, ExecutionException { - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter) - .writeShutdownMessage(Mockito.any(ShutdownReason.class)); - Mockito.doReturn(buildMessageFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT(); - Assert.assertTrue(protocol.shutdown(null, ShutdownReason.ZOMBIE)); + when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(buildFuture(new StatusMessage("shutdown"), Message.class)); + + Mockito.doReturn(buildFuture(true)).when(messageWriter) + .writeShutdownMessage(any(ShutdownReason.class)); + Mockito.doReturn(buildFuture(new StatusMessage("shutdown"))).when(messageReader).getNextMessageFromSTDOUT(); + assertThat(protocol.shutdown(null, ShutdownReason.ZOMBIE), equalTo(true)); } private Answer> buildMessageAnswers(List messages) { @@ -107,7 +127,7 @@ public class MultiLangProtocolTest { if (this.messageIterator.hasNext()) { message = this.messageIterator.next(); } - return buildMessageFuture(message); + return buildFuture(message); } }.init(messages); @@ -117,13 +137,12 @@ public class MultiLangProtocolTest { public void processRecordsWithCheckpointsTest() throws InterruptedException, ExecutionException, KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter) - .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); - Mockito.doAnswer(buildMessageAnswers(new ArrayList() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(true)); + when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList() { { - this.add(new CheckpointMessage("123", null)); - this.add(new CheckpointMessage(null, null)); + this.add(new CheckpointMessage("123", 0L, null)); + this.add(new CheckpointMessage(null, 0L, null)); /* * This procesRecords message will be ignored by the read loop which only cares about status and * checkpoint messages. All other lines and message types are ignored. By inserting it here, we check @@ -132,24 +151,23 @@ public class MultiLangProtocolTest { this.add(new ProcessRecordsMessage()); this.add(new StatusMessage("processRecords")); } - })).when(messageReader).getNextMessageFromSTDOUT(); - Assert.assertTrue(protocol.processRecords(new ArrayList(), checkpointer)); + })); + assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(true)); - Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint(); - Mockito.verify(checkpointer, Mockito.timeout(1)).checkpoint("123"); + verify(checkpointer, timeout(1)).checkpoint(); + verify(checkpointer, timeout(1)).checkpoint("123", 0L); } @Test public void processRecordsWithABadCheckpointTest() throws InterruptedException, ExecutionException { - Mockito.doReturn(buildBooleanFuture(true)).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.doReturn(buildBooleanFuture(false)).when(messageWriter) - .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); - Mockito.doAnswer(buildMessageAnswers(new ArrayList() { + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(buildFuture(true)); + when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(buildFuture(false)); + when(messageReader.getNextMessageFromSTDOUT()).thenAnswer(buildMessageAnswers(new ArrayList() { { - this.add(new CheckpointMessage("456", null)); + this.add(new CheckpointMessage("456", 0L, null)); this.add(new StatusMessage("processRecords")); } - })).when(messageReader).getNextMessageFromSTDOUT(); - Assert.assertFalse(protocol.processRecords(new ArrayList(), checkpointer)); + })); + assertThat(protocol.processRecords(new ProcessRecordsInput().withRecords(EMPTY_RECORD_LIST).withCheckpointer(checkpointer)), equalTo(false)); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java index 3cdc488c..a8f5885b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorFactoryTest.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.multilang; import org.junit.Assert; import org.junit.Test; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; public class StreamingRecordProcessorFactoryTest { diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 941b4582..c322f806 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -24,11 +24,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.multilang.messages.Message; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; @@ -44,12 +51,24 @@ import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage; import com.amazonaws.services.kinesis.multilang.messages.StatusMessage; import com.fasterxml.jackson.databind.ObjectMapper; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class StreamingRecordProcessorTest { private static final String shardId = "shard-123"; private int systemExitCount = 0; + @Mock + private Future messageFuture; + private IRecordProcessorCheckpointer unimplementedCheckpointer = new IRecordProcessorCheckpointer() { @Override @@ -129,11 +148,10 @@ public class StreamingRecordProcessorTest { Future trueFuture = Mockito.mock(Future.class); Mockito.doReturn(true).when(trueFuture).get(); - Mockito.doReturn(trueFuture).when(messageWriter).writeInitializeMessage(Mockito.anyString()); - Mockito.doReturn(trueFuture).when(messageWriter) - .writeCheckpointMessageWithError(Mockito.anyString(), Mockito.any(Throwable.class)); - Mockito.doReturn(trueFuture).when(messageWriter).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.doReturn(trueFuture).when(messageWriter).writeShutdownMessage(Mockito.any(ShutdownReason.class)); + when(messageWriter.writeInitializeMessage(any(InitializationInput.class))).thenReturn(trueFuture); + when(messageWriter.writeCheckpointMessageWithError(anyString(), anyLong(), any(Throwable.class))).thenReturn(trueFuture); + when(messageWriter.writeProcessRecordsMessage(any(ProcessRecordsInput.class))).thenReturn(trueFuture); + when(messageWriter.writeShutdownMessage(any(ShutdownReason.class))).thenReturn(trueFuture); } private void phases(Answer answer) throws InterruptedException, ExecutionException { @@ -145,16 +163,15 @@ public class StreamingRecordProcessorTest { * processRecords * shutdown */ - Future future = Mockito.mock(Future.class); - Mockito.doAnswer(answer).when(future).get(); - Mockito.doReturn(future).when(messageReader).getNextMessageFromSTDOUT(); + when(messageFuture.get()).thenAnswer(answer); + when(messageReader.getNextMessageFromSTDOUT()).thenReturn(messageFuture); List testRecords = new ArrayList(); - recordProcessor.initialize(shardId); - recordProcessor.processRecords(testRecords, unimplementedCheckpointer); - recordProcessor.processRecords(testRecords, unimplementedCheckpointer); - recordProcessor.shutdown(unimplementedCheckpointer, ShutdownReason.ZOMBIE); + recordProcessor.initialize(new InitializationInput().withShardId(shardId)); + recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); + recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); + recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE)); } @Test @@ -180,9 +197,9 @@ public class StreamingRecordProcessorTest { phases(answer); - Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId); - Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.verify(messageWriter, Mockito.times(1)).writeShutdownMessage(ShutdownReason.ZOMBIE); + verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId)); + verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); + verify(messageWriter).writeShutdownMessage(ShutdownReason.ZOMBIE); } @Test @@ -211,9 +228,9 @@ public class StreamingRecordProcessorTest { phases(answer); - Mockito.verify(messageWriter, Mockito.times(1)).writeInitializeMessage(shardId); - Mockito.verify(messageWriter, Mockito.times(2)).writeProcessRecordsMessage(Mockito.anyList()); - Mockito.verify(messageWriter, Mockito.times(0)).writeShutdownMessage(ShutdownReason.ZOMBIE); + verify(messageWriter).writeInitializeMessage(new InitializationInput().withShardId(shardId)); + verify(messageWriter, times(2)).writeProcessRecordsMessage(any(ProcessRecordsInput.class)); + verify(messageWriter, never()).writeShutdownMessage(ShutdownReason.ZOMBIE); Assert.assertEquals(1, systemExitCount); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index ff7bc84e..0daa378f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import org.junit.Assert; import org.junit.Test; @@ -31,8 +33,8 @@ public class MessageTest { @Test public void toStringTest() { Message[] messages = - new Message[] { new CheckpointMessage("1234567890", null), new InitializeMessage("shard-123"), - new ProcessRecordsMessage(new ArrayList() { + new Message[] { new CheckpointMessage("1234567890", 0L, null), new InitializeMessage(new InitializationInput().withShardId("shard-123")), + new ProcessRecordsMessage(new ProcessRecordsInput().withRecords(new ArrayList() { { this.add(new Record() { { @@ -42,7 +44,7 @@ public class MessageTest { } }); } - }), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), + })), new ShutdownMessage(ShutdownReason.ZOMBIE), new StatusMessage("processRecords"), new InitializeMessage(), new ProcessRecordsMessage() }; for (int i = 0; i < messages.length; i++) {