diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 9aca832e..0298aa75 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -214,6 +214,7 @@ class ProcessTask implements ITask { + shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) .withCheckpointer(recordProcessorCheckpointer) + .withShardId(shardInfo.getShardId()) .withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index 68bfcd8a..dd591778 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -32,6 +32,7 @@ public class ProcessRecordsInput { private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; + private String shardId; /** * Default constructor. @@ -89,6 +90,10 @@ public class ProcessRecordsInput { return millisBehindLatest; } + public String getShardId() { + return shardId; + } + /** * Set milliseconds behind latest. * @@ -110,6 +115,11 @@ public class ProcessRecordsInput { this.cacheExitTime = cacheExitTime; return this; } + + public ProcessRecordsInput withShardId(String shardId) { + this.shardId = shardId; + return this; + } public Duration getTimeSpentInCache() { if (cacheEntryTime == null || cacheExitTime == null) {