Handlign garbage shard case while delivering lease cleanup

This commit is contained in:
Ashwin Giridharan 2020-06-15 21:53:30 -07:00
parent b60dd60f35
commit 8ec14baca1

View file

@ -124,46 +124,29 @@ public class ShutdownTask implements ConsumerTask {
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
currentShardLease, shardInfo);
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease); updateLeaseWithChildShards(currentShardLease);
// Attempt to do shard checkpointing and throw on exception.
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) attemptShardEndCheckpointing(scope, startTime);
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); // Enqueue completed shard for deletion.
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
}
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
} else { } else {
// This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required. // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required.
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, // If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint
currentShardLease, shardInfo); // or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a
// garbage shard.
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult;
try { try {
leaseCleanupResult = leaseCleanupManager // Do a best effort shard end checkpointing, before attempting to cleanup the lease,
.cleanupLease(leasePendingDeletion, false, true); // in the case of RNF Exception.
if (leaseCleanupResult.leaseCleanedUp()) { attemptShardEndCheckpointing(scope, startTime);
log.info("Cleaned up garbage lease {} for {}. Details : {}", } finally {
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult); attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease);
} else {
log.error("Unable to cleanup garbage lease {} for {}. Details : {} ",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
// If we are unable to delete this lease and the reason being RNF, then enqueue it
// for deletion, so that we don't end up consuming service TPS on any bugs.
if (leaseCleanupResult.wasResourceNotFound()) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
} catch (Exception e) {
log.error("Unable to cleanup garbage lease {} for {}", currentShardLease.leaseKey(),
streamIdentifier, e);
} }
} }
} }
@ -197,6 +180,41 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception); return new TaskResult(exception);
} }
private void attemptShardEndCheckpointing(MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
}
}
private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) {
final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult;
try {
leaseCleanupResult = leaseCleanupManager
.cleanupLease(leasePendingDeletion, false, true);
if (leaseCleanupResult.leaseCleanedUp()) {
log.info("Cleaned up garbage lease {} for {}. Details : {}",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
} else {
log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
// If we are unable to delete this lease and the reason being RNF, then enqueue it
// for deletion, so that we don't end up consuming service TPS on any bugs.
if (leaseCleanupResult.wasResourceNotFound()) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
} catch (Exception e) {
log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(),
streamIdentifier, e);
}
}
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());