diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index b93c3779..d9646351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -134,6 +134,7 @@ public class DynamoDBCheckpointer implements Checkpointer { lease.checkpoint(checkpoint); lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); lease.ownerSwitchesSinceCheckpoint(0L); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index 661cf2ff..8f6e165d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer { checkpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue); pendingCheckpoints.remove(leaseKey); + pendingCheckpointStates.remove(leaseKey); if (log.isDebugEnabled()) { log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);