Clean up checkpoint state after successful checkpoint
This commit is contained in:
parent
5355b4b7c5
commit
1794874e33
2 changed files with 2 additions and 0 deletions
|
|
@ -134,6 +134,7 @@ public class DynamoDBCheckpointer implements Checkpointer {
|
||||||
|
|
||||||
lease.checkpoint(checkpoint);
|
lease.checkpoint(checkpoint);
|
||||||
lease.pendingCheckpoint(null);
|
lease.pendingCheckpoint(null);
|
||||||
|
lease.pendingCheckpointState(null);
|
||||||
lease.ownerSwitchesSinceCheckpoint(0L);
|
lease.ownerSwitchesSinceCheckpoint(0L);
|
||||||
|
|
||||||
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey);
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer {
|
||||||
checkpoints.put(leaseKey, checkpointValue);
|
checkpoints.put(leaseKey, checkpointValue);
|
||||||
flushpoints.put(leaseKey, checkpointValue);
|
flushpoints.put(leaseKey, checkpointValue);
|
||||||
pendingCheckpoints.remove(leaseKey);
|
pendingCheckpoints.remove(leaseKey);
|
||||||
|
pendingCheckpointStates.remove(leaseKey);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
|
log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue