From 37281e949399563e2dccae4b83c7f90f32fcad42 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 20:57:55 -0700 Subject: [PATCH 1/6] Deleting lease immediately in RNF Exception --- .../kinesis/leases/LeaseCleanupManager.java | 43 +++++++++++---- .../kinesis/lifecycle/ShutdownTask.java | 55 ++++++++++++++----- 2 files changed, 74 insertions(+), 24 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 6d6b46a8..4fda2106 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -44,6 +44,7 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.time.Duration; +import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Optional; @@ -116,13 +117,24 @@ public class LeaseCleanupManager { //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. if (!deletionQueue.contains(leasePendingDeletion)) { log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); - deletionQueue.add(leasePendingDeletion); + if (!deletionQueue.add(leasePendingDeletion)) { + log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey()); + } } else { log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey()); } } } + /** + * Check if lease was already enqueued for deletion. + * @param leasePendingDeletion + * @return true if enqueued for deletion; false otherwise. + */ + public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) { + return deletionQueue.contains(leasePendingDeletion); + } + /** * Returns how many leases are currently waiting in the queue pending deletion. * @return number of leases pending deletion. @@ -139,7 +151,8 @@ public class LeaseCleanupManager { return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; } - private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException, + public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion, + boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException, InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { final Lease lease = leasePendingDeletion.lease(); final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); @@ -150,9 +163,11 @@ public class LeaseCleanupManager { boolean cleanedUpCompletedLease = false; boolean cleanedUpGarbageLease = false; boolean alreadyCheckedForGarbageCollection = false; + boolean wereChildShardsPresent = false; + boolean wasResourceNotFound = false; try { - if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) { + if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { Set childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { @@ -172,18 +187,21 @@ public class LeaseCleanupManager { cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); } - if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) { + if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { try { - getChildShardsFromService(shardInfo, streamIdentifier); + wereChildShardsPresent = !CollectionUtils + .isNullOrEmpty(getChildShardsFromService(shardInfo, streamIdentifier)); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } } } catch (ResourceNotFoundException e) { + wasResourceNotFound = true; cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); } - return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease); + return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, + wasResourceNotFound); } private Set getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier) @@ -289,22 +307,23 @@ public class LeaseCleanupManager { final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); final String leaseKey = leasePendingDeletion.lease().leaseKey(); final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); - boolean deletionFailed = true; + boolean deletionSucceeded = false; try { - final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion); + final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion, + timeToCheckForCompletedShard(), timeToCheckForGarbageShard()); completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease(); garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease(); if (leaseCleanupResult.leaseCleanedUp()) { log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); - deletionFailed = false; + deletionSucceeded = true; } } catch (Exception e) { log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + "scheduled execution.", leaseKey, streamIdentifier, e); } - if (deletionFailed) { + if (!deletionSucceeded) { log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); failedDeletions.add(leasePendingDeletion); } @@ -332,9 +351,11 @@ public class LeaseCleanupManager { } @Value - private class LeaseCleanupResult { + public static class LeaseCleanupResult { boolean cleanedUpCompletedLease; boolean cleanedUpGarbageLease; + boolean wereChildShardsPresent; + boolean wasResourceNotFound; public boolean leaseCleanedUp() { return cleanedUpCompletedLease | cleanedUpGarbageLease; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 5f1dcd25..73745168 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -128,21 +128,50 @@ public class ShutdownTask implements ConsumerTask { if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); updateLeaseWithChildShards(currentShardLease); - } - final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); - if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - recordProcessorCheckpointer - .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); - recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); - } + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) + .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + recordProcessorCheckpointer + .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); + recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + } - final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); - leaseCleanupManager.enqueueForDeletion(garbageLease); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + + } else { + // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, + currentShardLease, shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; + try { + leaseCleanupResult = leaseCleanupManager + .cleanupLease(leasePendingDeletion, false, true); + if (leaseCleanupResult.leaseCleanedUp()) { + log.info("Cleaned up garbage lease {} for {}. Details : {}", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + } else { + log.error("Unable to cleanup garbage lease {} for {}. Details : {} ", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + // If we are unable to delete this lease and the reason being RNF, then enqueue it + // for deletion, so that we don't end up consuming service TPS on any bugs. + if (leaseCleanupResult.wasResourceNotFound()) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } catch (Exception e) { + log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), + streamIdentifier, e); + } finally { + + } + } + } } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); } From b60dd60f35b956b7d97af499050a68e28100a593 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 21:20:33 -0700 Subject: [PATCH 2/6] Code refactoring - 1 --- .../leases/dynamodb/DynamoDBLeaseRenewer.java | 14 ++++++-------- .../amazon/kinesis/lifecycle/ShutdownTask.java | 8 +++----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index ecb0fc26..e457b5ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -298,14 +298,12 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer { } final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); - if (StringUtils.isNotEmpty(singleStreamShardId)) { - if(lease instanceof MultiStreamLease) { - MetricsUtil.addStreamId(scope, - StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); - MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); - } else { - MetricsUtil.addShardId(scope, singleStreamShardId); - } + if (lease instanceof MultiStreamLease) { + MetricsUtil.addStreamId(scope, + StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); + MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); + } else if (StringUtils.isNotEmpty(singleStreamShardId)) { + MetricsUtil.addShardId(scope, singleStreamShardId); } long startTime = System.currentTimeMillis(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 73745168..64e394b3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -132,9 +132,6 @@ public class ShutdownTask implements ConsumerTask { final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - recordProcessorCheckpointer - .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); - recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); @@ -167,8 +164,6 @@ public class ShutdownTask implements ConsumerTask { } catch (Exception e) { log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), streamIdentifier, e); - } finally { - } } } @@ -203,6 +198,9 @@ public class ShutdownTask implements ConsumerTask { } private void applicationCheckpointAndVerification() { + recordProcessorCheckpointer + .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); + recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null From 8ec14baca1a31d9334b4782adf43578bba0f8936 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 21:53:30 -0700 Subject: [PATCH 3/6] Handlign garbage shard case while delivering lease cleanup --- .../kinesis/lifecycle/ShutdownTask.java | 78 ++++++++++++------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 64e394b3..9c77d910 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -124,46 +124,29 @@ public class ShutdownTask implements ConsumerTask { // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, + currentShardLease, shardInfo); if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); updateLeaseWithChildShards(currentShardLease); - - final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); - if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); - } - - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); + // Attempt to do shard checkpointing and throw on exception. + attemptShardEndCheckpointing(scope, startTime); + // Enqueue completed shard for deletion. leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); } else { // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, - currentShardLease, shardInfo); + // If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint + // or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a + // garbage shard. if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; try { - leaseCleanupResult = leaseCleanupManager - .cleanupLease(leasePendingDeletion, false, true); - if (leaseCleanupResult.leaseCleanedUp()) { - log.info("Cleaned up garbage lease {} for {}. Details : {}", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - } else { - log.error("Unable to cleanup garbage lease {} for {}. Details : {} ", - currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); - // If we are unable to delete this lease and the reason being RNF, then enqueue it - // for deletion, so that we don't end up consuming service TPS on any bugs. - if (leaseCleanupResult.wasResourceNotFound()) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - } - } catch (Exception e) { - log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(), - streamIdentifier, e); + // Do a best effort shard end checkpointing, before attempting to cleanup the lease, + // in the case of RNF Exception. + attemptShardEndCheckpointing(scope, startTime); + } finally { + attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease); } } } @@ -197,6 +180,41 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } + private void attemptShardEndCheckpointing(MetricsScope scope, long startTime) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, + CustomerApplicationException { + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) + .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + } + } + + private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) { + final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult; + try { + leaseCleanupResult = leaseCleanupManager + .cleanupLease(leasePendingDeletion, false, true); + if (leaseCleanupResult.leaseCleanedUp()) { + log.info("Cleaned up garbage lease {} for {}. Details : {}", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + } else { + log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ", + currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); + // If we are unable to delete this lease and the reason being RNF, then enqueue it + // for deletion, so that we don't end up consuming service TPS on any bugs. + if (leaseCleanupResult.wasResourceNotFound()) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } catch (Exception e) { + log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(), + streamIdentifier, e); + } + } + private void applicationCheckpointAndVerification() { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); From 7b0dc0d3e58be27c49f29ebab6caf72f73c64af1 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 22:00:56 -0700 Subject: [PATCH 4/6] Added comments on the garbage cleanup logic --- .../java/software/amazon/kinesis/lifecycle/ShutdownTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9c77d910..dd98ca87 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -146,6 +146,8 @@ public class ShutdownTask implements ConsumerTask { // in the case of RNF Exception. attemptShardEndCheckpointing(scope, startTime); } finally { + // If we don't want to cleanup the garbage shard without successful shard end + // checkpointing, remove the try finally construct and only execute the methods. attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease); } } From 4d65f5603899d8389aadd5b74d4783ed932e577e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 22:02:33 -0700 Subject: [PATCH 5/6] Added comments on the garbage cleanup logic - 2 --- .../java/software/amazon/kinesis/lifecycle/ShutdownTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index dd98ca87..9743704d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -146,6 +146,7 @@ public class ShutdownTask implements ConsumerTask { // in the case of RNF Exception. attemptShardEndCheckpointing(scope, startTime); } finally { + // Attempt to garbage collect if this shard is no longer associated with the stream. // If we don't want to cleanup the garbage shard without successful shard end // checkpointing, remove the try finally construct and only execute the methods. attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease); From 374e47b208afccfbb0ca346bb73e54b12d270437 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 15 Jun 2020 23:13:47 -0700 Subject: [PATCH 6/6] updating wereChildShardsPresent hint in all cases --- .../software/amazon/kinesis/leases/LeaseCleanupManager.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 4fda2106..0a2b65eb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -173,9 +173,10 @@ public class LeaseCleanupManager { try { childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); - if (childShardKeys == null) { + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName()); } else { + wereChildShardsPresent = true; updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); } } catch (ExecutionException e) { @@ -183,6 +184,8 @@ public class LeaseCleanupManager { } finally { alreadyCheckedForGarbageCollection = true; } + } else { + wereChildShardsPresent = true; } cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); }