From da46d5bd48a4bbf5d8ac900bcaabc5cf8363bbdc Mon Sep 17 00:00:00 2001 From: Jan Sochor Date: Wed, 30 Mar 2022 13:28:35 +0200 Subject: [PATCH] Pass isAtShardEnd correctly to processRecords call The default is false otherwise, i.e., the processor is always getting isAtShardEnd=false. --- .../java/software/amazon/kinesis/lifecycle/ProcessTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f05efb91..e4b38815 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -215,7 +215,7 @@ public class ProcessTask implements ConsumerTask { shardInfoId); final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) - .checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); + .isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt()