Pass isAtShardEnd correctly to processRecords call (#935)
The default is false otherwise, i.e., the processor is always getting isAtShardEnd=false.
This commit is contained in:
parent
6146ff9851
commit
65c95ed872
1 changed files with 1 additions and 1 deletions
|
|
@ -215,7 +215,7 @@ public class ProcessTask implements ConsumerTask {
|
||||||
shardInfoId);
|
shardInfoId);
|
||||||
|
|
||||||
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
|
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
|
||||||
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
|
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
|
||||||
|
|
||||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
||||||
shardInfo.streamIdentifierSerOpt()
|
shardInfo.streamIdentifierSerOpt()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue