From 1794874e3365d38ea2cc2c83bed1c3ff844dc9e9 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Tue, 7 Apr 2020 03:45:03 -0400 Subject: [PATCH] Clean up checkpoint state after successful checkpoint --- .../amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java | 1 + .../software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java | 1 + 2 files changed, 2 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java index b93c3779..d9646351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/dynamodb/DynamoDBCheckpointer.java @@ -134,6 +134,7 @@ public class DynamoDBCheckpointer implements Checkpointer { lease.checkpoint(checkpoint); lease.pendingCheckpoint(null); + lease.pendingCheckpointState(null); lease.ownerSwitchesSinceCheckpoint(0L); return leaseCoordinator.updateLease(lease, concurrencyToken, operation, leaseKey); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java index 661cf2ff..8f6e165d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/InMemoryCheckpointer.java @@ -44,6 +44,7 @@ public class InMemoryCheckpointer implements Checkpointer { checkpoints.put(leaseKey, checkpointValue); flushpoints.put(leaseKey, checkpointValue); pendingCheckpoints.remove(leaseKey); + pendingCheckpointStates.remove(leaseKey); if (log.isDebugEnabled()) { log.debug("shardId: {} checkpoint: {}", leaseKey, checkpointValue);