From a2b60193369f3505060cc32f384b3fb6bd9f2c6b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 24 Jun 2020 12:39:18 -0700 Subject: [PATCH 1/4] Changes to enqueue lease for deletion in potential garbage cleanup scenario. --- .../kinesis/leases/LeaseCleanupManager.java | 3 +- .../kinesis/lifecycle/ShutdownTask.java | 17 ++++++-- .../leases/LeaseCleanupManagerTest.java | 43 ------------------- 3 files changed, 16 insertions(+), 47 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 1152d157..7b306a68 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -105,7 +105,8 @@ public class LeaseCleanupManager { } /** - * Enqueues a lease for deletion. + * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} + * for checking the duplicate entries. * @param leasePendingDeletion */ public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index fc206bea..d6282cff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -136,8 +136,18 @@ public class ShutdownTask implements ConsumerTask { updateLeaseWithChildShards(currentShardLease); } if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - attemptShardEndCheckpointing(scope, startTime); - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(scope, startTime); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || !CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); @@ -169,7 +179,7 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } - private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) + private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) @@ -179,6 +189,7 @@ public class ShutdownTask implements ConsumerTask { // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); } + return true; } private void applicationCheckpointAndVerification() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index eb06a4a0..d02ced04 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -225,49 +225,6 @@ public class LeaseCleanupManagerTest { verify(leaseRefresher, times(1)).deleteLease(heldLease); } - /** - * Tests that if a lease deletion fails, it's re-enqueued for deletion. - */ - @Test - public final void testFailedDeletionsReEnqueued() throws Exception { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class); - - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - - /** - * Tests duplicate leases are not enqueued for deletion. - */ - @Test - public final void testNoDuplicateLeasesEnqueued() { - // Disable lease cleanup so that the queue isn't drained while the test is running. - cleanupLeasesOfCompletedShards = false; - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, - NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, - completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); - - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - - // Enqueue the same lease twice. - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); - Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); - } - private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, ExtendedSequenceNumber extendedSequenceNumber, int expectedDeletedLeases) throws Exception { From bd6a7d8b906e1038a5a44a9cdb54603ff8fc6203 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 24 Jun 2020 12:43:55 -0700 Subject: [PATCH 2/4] Logging to track pending leases to cleanup --- .../amazon/kinesis/leases/LeaseCleanupManager.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 7b306a68..119402cb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -136,7 +136,7 @@ public class LeaseCleanupManager { * Returns how many leases are currently waiting in the queue pending deletion. * @return number of leases pending deletion. */ - public int leasesPendingDeletion() { + private int leasesPendingDeletion() { return deletionQueue.size(); } @@ -311,6 +311,7 @@ public class LeaseCleanupManager { @VisibleForTesting void cleanupLeases() { + log.info("Number of pending leases to clean before the scan : {}", leasesPendingDeletion()); if (deletionQueue.isEmpty()) { log.debug("No leases pending deletion."); } else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) { @@ -341,24 +342,22 @@ public class LeaseCleanupManager { log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + "scheduled execution.", leaseKey, streamIdentifier, e); } - if (!deletionSucceeded) { log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); failedDeletions.add(leasePendingDeletion); } } - if (completedLeaseCleanedUp) { log.debug("At least one completed lease was cleaned up - restarting interval"); completedLeaseStopwatch.reset().start(); } - if (garbageLeaseCleanedUp) { log.debug("At least one garbage lease was cleaned up - restarting interval"); garbageLeaseStopwatch.reset().start(); } - deletionQueue.addAll(failedDeletions); + + log.info("Number of pending leases to clean after the scan : {}", leasesPendingDeletion()); } } From b636edd007ed5a37e93d403ca063a252b3c25dbe Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 24 Jun 2020 13:07:42 -0700 Subject: [PATCH 3/4] ShutdownTask bug fix --- .../software/amazon/kinesis/lifecycle/ShutdownTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 5920646c..d5af6627 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -153,7 +153,7 @@ public class ShutdownTaskTest { final TaskResult result = task.call(); assertNotNull(result.getException()); - assertTrue(result.getException() instanceof KinesisClientLibIOException); + assertTrue(result.getException() instanceof IllegalStateException); verify(recordsPublisher, never()).shutdown(); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); From 950faf74756b5f048841d8c2fcbb4e285865303d Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 24 Jun 2020 14:34:01 -0700 Subject: [PATCH 4/4] Fixing condition check --- .../java/software/amazon/kinesis/lifecycle/ShutdownTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d6282cff..91ea125b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -144,7 +144,7 @@ public class ShutdownTask implements ConsumerTask { // if childshards is empty. When child shards is empty then either it is due to // completed shard being reprocessed or we got RNF from service. // For these cases enqueue the lease for deletion. - if (isSuccess || !CollectionUtils.isNullOrEmpty(childShards)) { + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); } }