diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java index 89cf092a..32930f43 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java @@ -41,8 +41,10 @@ public interface IRecordProcessor { * * @param records Data records to be processed * @param checkpointer RecordProcessor should use this instance to checkpoint their progress. + * @param millisBehindLatest indicates how many milliseconds the underlying iterator is behind from the latest record + * in the shard. */ - void processRecords(List records, IRecordProcessorCheckpointer checkpointer); + void processRecords(List records, IRecordProcessorCheckpointer checkpointer, Long millisBehindLatest); /** * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java index 477acb74..7fc57ba9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java @@ -39,8 +39,8 @@ class V1ToV2RecordProcessorAdapter implements IRecordProcessor { @Override public void processRecords(ProcessRecordsInput processRecordsInput) { - recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer()); - + recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer(), + processRecordsInput.getMillisBehindLatest()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java index cc6be56f..3428ad48 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java @@ -45,7 +45,7 @@ public class InitializeMessage extends Message { /** * Convenience constructor. * - * @param shardId The shard id. + * @param initializationInput The shard id and extended sequence number. */ public InitializeMessage(InitializationInput initializationInput) { this.shardId = initializationInput.getShardId(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..4fb278ac 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -175,7 +175,8 @@ public class WorkerTest { } @Override - public void processRecords(List dataRecords, IRecordProcessorCheckpointer checkpointer) { + public void processRecords(List dataRecords, IRecordProcessorCheckpointer checkpointer, + Long millisBehindLatest) { try { checkpointer.checkpoint(); } catch (KinesisClientLibNonRetryableException e) {