Pass isAtShardEnd correctly to processRecords call

The default is false otherwise, i.e., the processor is always getting isAtShardEnd=false.
This commit is contained in:
Jan Sochor 2022-03-30 13:28:35 +02:00
parent 7f5eb9f34b
commit da46d5bd48

View file

@ -215,7 +215,7 @@ public class ProcessTask implements ConsumerTask {
shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()