diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 1152d157..119402cb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -105,7 +105,8 @@ public class LeaseCleanupManager { } /** - * Enqueues a lease for deletion. + * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} + * for checking the duplicate entries. * @param leasePendingDeletion */ public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { @@ -135,7 +136,7 @@ public class LeaseCleanupManager { * Returns how many leases are currently waiting in the queue pending deletion. * @return number of leases pending deletion. */ - public int leasesPendingDeletion() { + private int leasesPendingDeletion() { return deletionQueue.size(); } @@ -310,6 +311,7 @@ public class LeaseCleanupManager { @VisibleForTesting void cleanupLeases() { + log.info("Number of pending leases to clean before the scan : {}", leasesPendingDeletion()); if (deletionQueue.isEmpty()) { log.debug("No leases pending deletion."); } else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) { @@ -340,24 +342,22 @@ public class LeaseCleanupManager { log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + "scheduled execution.", leaseKey, streamIdentifier, e); } - if (!deletionSucceeded) { log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); failedDeletions.add(leasePendingDeletion); } } - if (completedLeaseCleanedUp) { log.debug("At least one completed lease was cleaned up - restarting interval"); completedLeaseStopwatch.reset().start(); } - if (garbageLeaseCleanedUp) { log.debug("At least one garbage lease was cleaned up - restarting interval"); garbageLeaseStopwatch.reset().start(); } - deletionQueue.addAll(failedDeletions); + + log.info("Number of pending leases to clean after the scan : {}", leasesPendingDeletion()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index fc206bea..91ea125b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -136,8 +136,18 @@ public class ShutdownTask implements ConsumerTask { updateLeaseWithChildShards(currentShardLease); } if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - attemptShardEndCheckpointing(scope, startTime); - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(scope, startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); @@ -169,7 +179,7 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } - private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) + private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) @@ -179,6 +189,7 @@ public class ShutdownTask implements ConsumerTask { // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); } + return true; } private void applicationCheckpointAndVerification() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index eb06a4a0..d02ced04 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -225,49 +225,6 @@ public class LeaseCleanupManagerTest { verify(leaseRefresher, times(1)).deleteLease(heldLease); } - /** - * Tests that if a lease deletion fails, it's re-enqueued for deletion. - */ - @Test - public final void testFailedDeletionsReEnqueued() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class); - - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - - /** - * Tests duplicate leases are not enqueued for deletion. - */ - @Test - public final void testNoDuplicateLeasesEnqueued() { - // Disable lease cleanup so that the queue isn't drained while the test is running. - cleanupLeasesOfCompletedShards = false; - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, - NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, - completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - - // Enqueue the same lease twice. - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, ExtendedSequenceNumber extendedSequenceNumber, int expectedDeletedLeases) throws Exception { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 5920646c..d5af6627 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -153,7 +153,7 @@ public class ShutdownTaskTest { final TaskResult result = task.call(); assertNotNull(result.getException()); - assertTrue(result.getException() instanceof KinesisClientLibIOException); + assertTrue(result.getException() instanceof IllegalStateException); verify(recordsPublisher, never()).shutdown(); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());