diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 1152d157..7b306a68 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -105,7 +105,8 @@ public class LeaseCleanupManager { } /** - * Enqueues a lease for deletion. + * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} + * for checking the duplicate entries. * @param leasePendingDeletion */ public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index fc206bea..d6282cff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -136,8 +136,18 @@ public class ShutdownTask implements ConsumerTask { updateLeaseWithChildShards(currentShardLease); } if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - attemptShardEndCheckpointing(scope, startTime); - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(scope, startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || !CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); @@ -169,7 +179,7 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } - private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) + private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) @@ -179,6 +189,7 @@ public class ShutdownTask implements ConsumerTask { // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); } + return true; } private void applicationCheckpointAndVerification() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index eb06a4a0..d02ced04 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -225,49 +225,6 @@ public class LeaseCleanupManagerTest { verify(leaseRefresher, times(1)).deleteLease(heldLease); } - /** - * Tests that if a lease deletion fails, it's re-enqueued for deletion. - */ - @Test - public final void testFailedDeletionsReEnqueued() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class); - - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - - /** - * Tests duplicate leases are not enqueued for deletion. - */ - @Test - public final void testNoDuplicateLeasesEnqueued() { - // Disable lease cleanup so that the queue isn't drained while the test is running. - cleanupLeasesOfCompletedShards = false; - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, - NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, - completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - - // Enqueue the same lease twice. - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, ExtendedSequenceNumber extendedSequenceNumber, int expectedDeletedLeases) throws Exception {