From 37281e949399563e2dccae4b83c7f90f32fcad42 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 20:57:55 -0700 Subject: [PATCH] Deleting lease immediately in RNF Exception --- .../kinesis/leases/LeaseCleanupManager.java | 43 +++++++++++---- .../kinesis/lifecycle/ShutdownTask.java | 55 ++++++++++++++----- 2 files changed, 74 insertions(+), 24 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 6d6b46a8..4fda2106 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -44,6 +44,7 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Duration; +import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Optional; @@ -116,13 +117,24 @@ public class LeaseCleanupManager { //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. if (!deletionQueue.contains(leasePendingDeletion)) { log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); - deletionQueue.add(leasePendingDeletion); + if (!deletionQueue.add(leasePendingDeletion)) { + log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey()); + } } else { log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey()); } } } + /** + * Check if lease was already enqueued for deletion. + * @param leasePendingDeletion + * @return true if enqueued for deletion; false otherwise. + */ + public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) { + return deletionQueue.contains(leasePendingDeletion); + } + /** * Returns how many leases are currently waiting in the queue pending deletion. * @return number of leases pending deletion. @@ -139,7 +151,8 @@ public class LeaseCleanupManager { return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; } - private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException, + public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion, + boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException, InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { final Lease lease = leasePendingDeletion.lease(); final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); @@ -150,9 +163,11 @@ public class LeaseCleanupManager { boolean cleanedUpCompletedLease = false; boolean cleanedUpGarbageLease = false; boolean alreadyCheckedForGarbageCollection = false; + boolean wereChildShardsPresent = false; + boolean wasResourceNotFound = false; try { - if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) { + if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { Set childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { @@ -172,18 +187,21 @@ public class LeaseCleanupManager { cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); } - if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) { + if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { try { - getChildShardsFromService(shardInfo, streamIdentifier); + wereChildShardsPresent = !CollectionUtils + .isNullOrEmpty(getChildShardsFromService(shardInfo, streamIdentifier)); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } } } catch (ResourceNotFoundException e) { + wasResourceNotFound = true; cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); } - return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease); + return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, + wasResourceNotFound); } private Set getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier) @@ -289,22 +307,23 @@ public class LeaseCleanupManager { final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); final String leaseKey = leasePendingDeletion.lease().leaseKey(); final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); - boolean deletionFailed = true; + boolean deletionSucceeded = false; try { - final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion); + final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion, + timeToCheckForCompletedShard(), timeToCheckForGarbageShard()); completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease(); garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease(); if (leaseCleanupResult.leaseCleanedUp()) { log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); - deletionFailed = false; + deletionSucceeded = true; } } catch (Exception e) { log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + "scheduled execution.", leaseKey, streamIdentifier, e); } - if (deletionFailed) { + if (!deletionSucceeded) { log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); failedDeletions.add(leasePendingDeletion); } @@ -332,9 +351,11 @@ public class LeaseCleanupManager { } @Value - private class LeaseCleanupResult { + public static class LeaseCleanupResult { boolean cleanedUpCompletedLease; boolean cleanedUpGarbageLease; + boolean wereChildShardsPresent; + boolean wasResourceNotFound; public boolean leaseCleanedUp() { return cleanedUpCompletedLease | cleanedUpGarbageLease; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 5f1dcd25..73745168 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -128,21 +128,50 @@ public class ShutdownTask implements ConsumerTask { if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); updateLeaseWithChildShards(currentShardLease); - } - final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); - if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - recordProcessorCheckpointer - .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); - recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); - } + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) + .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + recordProcessorCheckpointer + .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); + recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + } - final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); - leaseCleanupManager.enqueueForDeletion(garbageLease); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + + } else { + // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, + currentShardLease, shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; + try { + leaseCleanupResult = leaseCleanupManager + .cleanupLease(leasePendingDeletion, false, true); + if (leaseCleanupResult.leaseCleanedUp()) { + log.info("Cleaned up garbage lease {} for {}. Details : {}", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + } else { + log.error("Unable to cleanup garbage lease {} for {}. Details : {} ", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + // If we are unable to delete this lease and the reason being RNF, then enqueue it + // for deletion, so that we don't end up consuming service TPS on any bugs. + if (leaseCleanupResult.wasResourceNotFound()) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } catch (Exception e) { + log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), + streamIdentifier, e); + } finally { + + } + } + } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); }