From 67ce7a783d58bb451530bac96f922086118ad92c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 23 Jun 2020 03:03:42 -0700 Subject: [PATCH] Fixing lease cleanup issues with multistreaming --- .../amazon/kinesis/coordinator/Scheduler.java | 5 +- .../kinesis/leases/LeaseCleanupManager.java | 79 +++++++++++-------- .../kinesis/lifecycle/ShutdownTask.java | 53 ++----------- 3 files changed, 57 insertions(+), 80 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b13fc6b1..db9cc145 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -532,15 +532,14 @@ public class Scheduler implements Runnable { if (!newStreamConfigMap.containsKey(streamIdentifier)) { if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { log.info( - "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams." - + streamIdentifier); + "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( currentStreamConfigMap.get(streamIdentifier)); shardSyncTaskManager.submitShardSyncTask(); } else { log.info( "Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases," - + " as part of this workflow" + streamIdentifier); + + " as part of this workflow", streamIdentifier); } currentSetOfStreamsIter.remove(); streamsSynced.add(streamIdentifier); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 0a2b65eb..1152d157 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -114,20 +114,16 @@ public class LeaseCleanupManager { log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.", lease.leaseKey()); } else { - //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. - if (!deletionQueue.contains(leasePendingDeletion)) { - log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); - if (!deletionQueue.add(leasePendingDeletion)) { - log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey()); - } - } else { - log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey()); + log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); + if (!deletionQueue.add(leasePendingDeletion)) { + log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey()); } } } /** * Check if lease was already enqueued for deletion. + * //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. * @param leasePendingDeletion * @return true if enqueued for deletion; false otherwise. */ @@ -168,26 +164,39 @@ public class LeaseCleanupManager { try { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { - Set childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); - if (CollectionUtils.isNullOrEmpty(childShardKeys)) { - try { - childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); + final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()); + if(leaseFromDDB != null) { + Set childShardKeys = leaseFromDDB.childShardIds(); + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { + try { + childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); - if (CollectionUtils.isNullOrEmpty(childShardKeys)) { - log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName()); - } else { - wereChildShardsPresent = true; - updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { + log.error( + "No child shards returned from service for shard {} for {} while cleaning up lease.", + shardInfo.shardId(), streamIdentifier.streamName()); + } else { + wereChildShardsPresent = true; + updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); + } + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } finally { + alreadyCheckedForGarbageCollection = true; } - } catch (ExecutionException e) { - throw exceptionManager.apply(e.getCause()); - } finally { - alreadyCheckedForGarbageCollection = true; + } else { + wereChildShardsPresent = true; + } + try { + cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); + } catch (Exception e) { + // Suppressing the exception here, so that we can attempt for garbage cleanup. + log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e); } } else { - wereChildShardsPresent = true; + log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName()); + cleanedUpCompletedLease = true; } - cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); } if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { @@ -256,20 +265,24 @@ public class LeaseCleanupManager { // 2. Its parent shard lease(s) have already been deleted. private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set childShardKeys) throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { - final Set processedChildShardLeases = new HashSet<>(); + final Set processedChildShardLeaseKeys = new HashSet<>(); + final Set childShardLeaseKeys = childShardKeys.stream().map(ck -> ShardInfo.getLeaseKey(shardInfo, ck)) + .collect(Collectors.toSet()); - for (String childShardKey : childShardKeys) { - final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow( - () -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " + - "lease table - not cleaning up lease " + lease)); + for (String childShardLeaseKey : childShardLeaseKeys) { + final Lease childShardLease = Optional.ofNullable( + leaseCoordinator.leaseRefresher().getLease(childShardLeaseKey)) + .orElseThrow(() -> new IllegalStateException( + "Child lease " + childShardLeaseKey + " for completed shard not found in " + + "lease table - not cleaning up lease " + lease)); - if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) - && !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) { - processedChildShardLeases.add(childShardLease.leaseKey()); + if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease + .checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) { + processedChildShardLeaseKeys.add(childShardLease.leaseKey()); } } - if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) { + if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { return false; } @@ -320,6 +333,8 @@ public class LeaseCleanupManager { if (leaseCleanupResult.leaseCleanedUp()) { log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); deletionSucceeded = true; + } else { + log.warn("Unable to clean up lease {} for {} due to {}", leaseKey, streamIdentifier, leaseCleanupResult); } } catch (Exception e) { log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 5e11fddf..fc206bea 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -101,7 +102,7 @@ public class ShutdownTask implements ConsumerTask { /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) - * + * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerTask#call() */ @Override @@ -113,7 +114,7 @@ public class ShutdownTask implements ConsumerTask { try { try { - log.debug("Invoking shutdown() for shard {} with child shards {} , concurrencyToken {}. Shutdown reason: {}", + log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); final long startTime = System.currentTimeMillis(); @@ -124,34 +125,19 @@ public class ShutdownTask implements ConsumerTask { // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + Validate.validState(currentShardLease != null, + "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", + leaseKeyProvider.apply(shardInfo)); final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); updateLeaseWithChildShards(currentShardLease); - // Attempt to do shard checkpointing and throw on exception. + } + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { attemptShardEndCheckpointing(scope, startTime); - // Enqueue completed shard for deletion. leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - - } else { - // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. - // If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint - // or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a - // garbage shard. - if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - try { - // Do a best effort shard end checkpointing, before attempting to cleanup the lease, - // in the case of RNF Exception. - attemptShardEndCheckpointing(scope, startTime); - } finally { - // Attempt to garbage collect if this shard is no longer associated with the stream. - // If we don't want to cleanup the garbage shard without successful shard end - // checkpointing, remove the try finally construct and only execute the methods. - attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease); - } - } } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); @@ -195,29 +181,6 @@ public class ShutdownTask implements ConsumerTask { } } - private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) { - final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; - try { - leaseCleanupResult = leaseCleanupManager - .cleanupLease(leasePendingDeletion, false, true); - if (leaseCleanupResult.leaseCleanedUp()) { - log.info("Cleaned up garbage lease {} for {}. Details : {}", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - } else { - log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - // If we are unable to delete this lease and the reason being RNF, then enqueue it - // for deletion, so that we don't end up consuming service TPS on any bugs. - if (leaseCleanupResult.wasResourceNotFound()) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - } - } catch (Exception e) { - log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(), - streamIdentifier, e); - } - } - private void applicationCheckpointAndVerification() { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());