Exposes the millisBehindCurrent metric to V1 IRecordProcessor implementations.
This commit is contained in:
parent
9720b1b249
commit
029da69926
4 changed files with 8 additions and 5 deletions
|
|
@ -41,8 +41,10 @@ public interface IRecordProcessor {
|
||||||
*
|
*
|
||||||
* @param records Data records to be processed
|
* @param records Data records to be processed
|
||||||
* @param checkpointer RecordProcessor should use this instance to checkpoint their progress.
|
* @param checkpointer RecordProcessor should use this instance to checkpoint their progress.
|
||||||
|
* @param millisBehindLatest indicates how many milliseconds the underlying iterator is behind from the latest record
|
||||||
|
* in the shard.
|
||||||
*/
|
*/
|
||||||
void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer);
|
void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer, Long millisBehindLatest);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
|
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,8 @@ class V1ToV2RecordProcessorAdapter implements IRecordProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
public void processRecords(ProcessRecordsInput processRecordsInput) {
|
||||||
recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer());
|
recordProcessor.processRecords(processRecordsInput.getRecords(), processRecordsInput.getCheckpointer(),
|
||||||
|
processRecordsInput.getMillisBehindLatest());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ public class InitializeMessage extends Message {
|
||||||
/**
|
/**
|
||||||
* Convenience constructor.
|
* Convenience constructor.
|
||||||
*
|
*
|
||||||
* @param shardId The shard id.
|
* @param initializationInput The shard id and extended sequence number.
|
||||||
*/
|
*/
|
||||||
public InitializeMessage(InitializationInput initializationInput) {
|
public InitializeMessage(InitializationInput initializationInput) {
|
||||||
this.shardId = initializationInput.getShardId();
|
this.shardId = initializationInput.getShardId();
|
||||||
|
|
|
||||||
|
|
@ -175,7 +175,8 @@ public class WorkerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processRecords(List<Record> dataRecords, IRecordProcessorCheckpointer checkpointer) {
|
public void processRecords(List<Record> dataRecords, IRecordProcessorCheckpointer checkpointer,
|
||||||
|
Long millisBehindLatest) {
|
||||||
try {
|
try {
|
||||||
checkpointer.checkpoint();
|
checkpointer.checkpoint();
|
||||||
} catch (KinesisClientLibNonRetryableException e) {
|
} catch (KinesisClientLibNonRetryableException e) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue