Merge pull request #58 from ashwing/ltr_1_lease_cleanup_fix

Lease cleanup handling for garbage shard
This commit is contained in:
ashwing 2020-06-16 09:10:46 -07:00 committed by GitHub
commit 65388573f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 33 deletions

View file

@ -44,6 +44,7 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -116,13 +117,24 @@ public class LeaseCleanupManager {
//TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
if (!deletionQueue.contains(leasePendingDeletion)) { if (!deletionQueue.contains(leasePendingDeletion)) {
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
deletionQueue.add(leasePendingDeletion); if (!deletionQueue.add(leasePendingDeletion)) {
log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey());
}
} else { } else {
log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey()); log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey());
} }
} }
} }
/**
* Check if lease was already enqueued for deletion.
* @param leasePendingDeletion
* @return true if enqueued for deletion; false otherwise.
*/
public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) {
return deletionQueue.contains(leasePendingDeletion);
}
/** /**
* Returns how many leases are currently waiting in the queue pending deletion. * Returns how many leases are currently waiting in the queue pending deletion.
* @return number of leases pending deletion. * @return number of leases pending deletion.
@ -139,7 +151,8 @@ public class LeaseCleanupManager {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
} }
private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException, public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException,
InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
final Lease lease = leasePendingDeletion.lease(); final Lease lease = leasePendingDeletion.lease();
final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
@ -150,17 +163,20 @@ public class LeaseCleanupManager {
boolean cleanedUpCompletedLease = false; boolean cleanedUpCompletedLease = false;
boolean cleanedUpGarbageLease = false; boolean cleanedUpGarbageLease = false;
boolean alreadyCheckedForGarbageCollection = false; boolean alreadyCheckedForGarbageCollection = false;
boolean wereChildShardsPresent = false;
boolean wasResourceNotFound = false;
try { try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try { try {
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
if (childShardKeys == null) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName()); log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName());
} else { } else {
wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -168,22 +184,27 @@ public class LeaseCleanupManager {
} finally { } finally {
alreadyCheckedForGarbageCollection = true; alreadyCheckedForGarbageCollection = true;
} }
} else {
wereChildShardsPresent = true;
} }
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
} }
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) { if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
try { try {
getChildShardsFromService(shardInfo, streamIdentifier); wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo, streamIdentifier));
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause()); throw exceptionManager.apply(e.getCause());
} }
} }
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
wasResourceNotFound = true;
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
} }
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease); return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
wasResourceNotFound);
} }
private Set<String> getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier) private Set<String> getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier)
@ -289,22 +310,23 @@ public class LeaseCleanupManager {
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
final String leaseKey = leasePendingDeletion.lease().leaseKey(); final String leaseKey = leasePendingDeletion.lease().leaseKey();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
boolean deletionFailed = true; boolean deletionSucceeded = false;
try { try {
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion); final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
timeToCheckForCompletedShard(), timeToCheckForGarbageShard());
completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease(); completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease(); garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
if (leaseCleanupResult.leaseCleanedUp()) { if (leaseCleanupResult.leaseCleanedUp()) {
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
deletionFailed = false; deletionSucceeded = true;
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
"scheduled execution.", leaseKey, streamIdentifier, e); "scheduled execution.", leaseKey, streamIdentifier, e);
} }
if (deletionFailed) { if (!deletionSucceeded) {
log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
failedDeletions.add(leasePendingDeletion); failedDeletions.add(leasePendingDeletion);
} }
@ -332,9 +354,11 @@ public class LeaseCleanupManager {
} }
@Value @Value
private class LeaseCleanupResult { public static class LeaseCleanupResult {
boolean cleanedUpCompletedLease; boolean cleanedUpCompletedLease;
boolean cleanedUpGarbageLease; boolean cleanedUpGarbageLease;
boolean wereChildShardsPresent;
boolean wasResourceNotFound;
public boolean leaseCleanedUp() { public boolean leaseCleanedUp() {
return cleanedUpCompletedLease | cleanedUpGarbageLease; return cleanedUpCompletedLease | cleanedUpGarbageLease;

View file

@ -298,14 +298,12 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
} }
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (StringUtils.isNotEmpty(singleStreamShardId)) { if (lease instanceof MultiStreamLease) {
if(lease instanceof MultiStreamLease) { MetricsUtil.addStreamId(scope,
MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()));
StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier())); MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId());
MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId()); } else if (StringUtils.isNotEmpty(singleStreamShardId)) {
} else { MetricsUtil.addShardId(scope, singleStreamShardId);
MetricsUtil.addShardId(scope, singleStreamShardId);
}
} }
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();

View file

@ -124,25 +124,35 @@ public class ShutdownTask implements ConsumerTask {
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
currentShardLease, shardInfo);
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease); updateLeaseWithChildShards(currentShardLease);
} // Attempt to do shard checkpointing and throw on exception.
attemptShardEndCheckpointing(scope, startTime);
// Enqueue completed shard for deletion.
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) } else {
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); // This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required.
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { // If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint
recordProcessorCheckpointer // or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); // garbage shard.
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. try {
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. // Do a best effort shard end checkpointing, before attempting to cleanup the lease,
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); // in the case of RNF Exception.
attemptShardEndCheckpointing(scope, startTime);
} finally {
// Attempt to garbage collect if this shard is no longer associated with the stream.
// If we don't want to cleanup the garbage shard without successful shard end
// checkpointing, remove the try finally construct and only execute the methods.
attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease);
}
}
} }
final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
leaseCleanupManager.enqueueForDeletion(garbageLease);
} else { } else {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
} }
@ -173,7 +183,45 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception); return new TaskResult(exception);
} }
private void attemptShardEndCheckpointing(MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
}
}
private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) {
final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult;
try {
leaseCleanupResult = leaseCleanupManager
.cleanupLease(leasePendingDeletion, false, true);
if (leaseCleanupResult.leaseCleanedUp()) {
log.info("Cleaned up garbage lease {} for {}. Details : {}",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
} else {
log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
// If we are unable to delete this lease and the reason being RNF, then enqueue it
// for deletion, so that we don't end up consuming service TPS on any bugs.
if (leaseCleanupResult.wasResourceNotFound()) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
} catch (Exception e) {
log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(),
streamIdentifier, e);
}
}
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null if (lastCheckpointValue == null