From 8ec14baca1a31d9334b4782adf43578bba0f8936 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 21:53:30 -0700 Subject: [PATCH] Handlign garbage shard case while delivering lease cleanup --- .../kinesis/lifecycle/ShutdownTask.java | 78 ++++++++++++------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 64e394b3..9c77d910 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -124,46 +124,29 @@ public class ShutdownTask implements ConsumerTask { // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, + currentShardLease, shardInfo); if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); updateLeaseWithChildShards(currentShardLease); - - final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); - if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); - } - - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); + // Attempt to do shard checkpointing and throw on exception. + attemptShardEndCheckpointing(scope, startTime); + // Enqueue completed shard for deletion. leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); } else { // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, - currentShardLease, shardInfo); + // If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint + // or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a + // garbage shard. if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; try { - leaseCleanupResult = leaseCleanupManager - .cleanupLease(leasePendingDeletion, false, true); - if (leaseCleanupResult.leaseCleanedUp()) { - log.info("Cleaned up garbage lease {} for {}. Details : {}", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - } else { - log.error("Unable to cleanup garbage lease {} for {}. Details : {} ", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - // If we are unable to delete this lease and the reason being RNF, then enqueue it - // for deletion, so that we don't end up consuming service TPS on any bugs. - if (leaseCleanupResult.wasResourceNotFound()) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - } - } catch (Exception e) { - log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), - streamIdentifier, e); + // Do a best effort shard end checkpointing, before attempting to cleanup the lease, + // in the case of RNF Exception. + attemptShardEndCheckpointing(scope, startTime); + } finally { + attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease); } } } @@ -197,6 +180,41 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } + private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, + CustomerApplicationException { + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) + .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + } + } + + private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) { + final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; + try { + leaseCleanupResult = leaseCleanupManager + .cleanupLease(leasePendingDeletion, false, true); + if (leaseCleanupResult.leaseCleanedUp()) { + log.info("Cleaned up garbage lease {} for {}. Details : {}", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + } else { + log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + // If we are unable to delete this lease and the reason being RNF, then enqueue it + // for deletion, so that we don't end up consuming service TPS on any bugs. + if (leaseCleanupResult.wasResourceNotFound()) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } catch (Exception e) { + log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(), + streamIdentifier, e); + } + } + private void applicationCheckpointAndVerification() { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());