Update ShardEnd checkpoint failure messaging (#591)
* Update shard end checkpoint failure messaging * Update shard end checkpoint failure messaging
This commit is contained in:
parent
161590c2ce
commit
c2a3f18670
1 changed files with 3 additions and 2 deletions
|
|
@ -106,8 +106,9 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
|
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
|
||||||
if (lastCheckpointValue == null
|
if (lastCheckpointValue == null
|
||||||
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
|
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
|
||||||
"Application didn't checkpoint at end of shard " + shardInfo.shardId());
|
+ shardInfo.shardId() + ". Application must checkpoint upon shard end. " +
|
||||||
|
"See ShardRecordProcessor.shardEnded javadocs for more information.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
|
shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue