From 029da69926aa2e68e9235224d550effd394fb0de Mon Sep 17 00:00:00 2001 From: Juan Botiva Date: Fri, 13 Oct 2017 12:09:46 -0500 Subject: [PATCH] Exposes the millisBehindCurrent metric to V1 IRecordProcessor implementations. --- .../kinesis/clientlibrary/interfaces/IRecordProcessor.java | 4 +++- .../lib/worker/V1ToV2RecordProcessorAdapter.java | 4 ++-- .../kinesis/multilang/messages/InitializeMessage.java | 2 +- .../services/kinesis/clientlibrary/lib/worker/WorkerTest.java | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java index 89cf092a..32930f43 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java @@ -41,8 +41,10 @@ public interface IRecordProcessor { * * @param records Data records to be processed * @param checkpointer RecordProcessor should use this instance to checkpoint their progress. + * @param millisBehindLatest indicates how many milliseconds the underlying iterator is behind from the latest record + * in the shard. */ - void processRecords(List records, IRecordProcessorCheckpointer checkpointer); + void processRecords(List records, IRecordProcessorCheckpointer checkpointer, Long millisBehindLatest); /** * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java index 477acb74..7fc57ba9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/V1ToV2RecordProcessorAdapter.java @@ -39,8 +39,8 @@ class V1ToV2RecordProcessorAdapter implements IRecordProcessor { @Override public void processRecords(ProcessRecordsInput processRecordsInput) { - recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer()); - + recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer(), + processRecordsInput.getMillisBehindLatest()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java index cc6be56f..3428ad48 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/InitializeMessage.java @@ -45,7 +45,7 @@ public class InitializeMessage extends Message { /** * Convenience constructor. * - * @param shardId The shard id. + * @param initializationInput The shard id and extended sequence number. */ public InitializeMessage(InitializationInput initializationInput) { this.shardId = initializationInput.getShardId(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..4fb278ac 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -175,7 +175,8 @@ public class WorkerTest { } @Override - public void processRecords(List dataRecords, IRecordProcessorCheckpointer checkpointer) { + public void processRecords(List dataRecords, IRecordProcessorCheckpointer checkpointer, + Long millisBehindLatest) { try { checkpointer.checkpoint(); } catch (KinesisClientLibNonRetryableException e) {