From 0a4fc8bef06a18d7f3b973f63fd6193c89965556 Mon Sep 17 00:00:00 2001 From: glarwood Date: Wed, 20 Mar 2019 18:30:39 +0000 Subject: [PATCH] refactor(ProcessTask): expose shardId --- .../kinesis/clientlibrary/lib/worker/ProcessTask.java | 1 + .../clientlibrary/types/ProcessRecordsInput.java | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 9aca832e..0298aa75 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -214,6 +214,7 @@ class ProcessTask implements ITask { + shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) .withCheckpointer(recordProcessorCheckpointer) + .withShardId(shardInfo.getShardId()) .withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index 68bfcd8a..dd591778 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -32,6 +32,7 @@ public class ProcessRecordsInput { private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; + private String shardId; /** * Default constructor. @@ -89,6 +90,10 @@ public class ProcessRecordsInput { return millisBehindLatest; } + public String getShardId() { + return shardId; + } + /** * Set milliseconds behind latest. * @@ -110,6 +115,11 @@ public class ProcessRecordsInput { this.cacheExitTime = cacheExitTime; return this; } + + public ProcessRecordsInput withShardId(String shardId) { + this.shardId = shardId; + return this; + } public Duration getTimeSpentInCache() { if (cacheEntryTime == null || cacheExitTime == null) {