Fixing lease cleanup issues with multistreaming

This commit is contained in:
Ashwin Giridharan 2020-06-23 03:03:42 -07:00
parent 0246e1e852
commit 67ce7a783d
3 changed files with 57 additions and 80 deletions

View file

@ -532,15 +532,14 @@ public class Scheduler implements Runnable {
if (!newStreamConfigMap.containsKey(streamIdentifier)) { if (!newStreamConfigMap.containsKey(streamIdentifier)) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) { if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
log.info( log.info(
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams." "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
+ streamIdentifier);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager( ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier)); currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask(); shardSyncTaskManager.submitShardSyncTask();
} else { } else {
log.info( log.info(
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases," "Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
+ " as part of this workflow" + streamIdentifier); + " as part of this workflow", streamIdentifier);
} }
currentSetOfStreamsIter.remove(); currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier); streamsSynced.add(streamIdentifier);

View file

@ -114,20 +114,16 @@ public class LeaseCleanupManager {
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.", log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
lease.leaseKey()); lease.leaseKey());
} else { } else {
//TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
if (!deletionQueue.contains(leasePendingDeletion)) {
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
if (!deletionQueue.add(leasePendingDeletion)) { if (!deletionQueue.add(leasePendingDeletion)) {
log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey()); log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey());
} }
} else {
log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey());
}
} }
} }
/** /**
* Check if lease was already enqueued for deletion. * Check if lease was already enqueued for deletion.
* //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
* @param leasePendingDeletion * @param leasePendingDeletion
* @return true if enqueued for deletion; false otherwise. * @return true if enqueued for deletion; false otherwise.
*/ */
@ -168,13 +164,17 @@ public class LeaseCleanupManager {
try { try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
if(leaseFromDDB != null) {
Set<String> childShardKeys = leaseFromDDB.childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try { try {
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName()); log.error(
"No child shards returned from service for shard {} for {} while cleaning up lease.",
shardInfo.shardId(), streamIdentifier.streamName());
} else { } else {
wereChildShardsPresent = true; wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
@ -187,7 +187,16 @@ public class LeaseCleanupManager {
} else { } else {
wereChildShardsPresent = true; wereChildShardsPresent = true;
} }
try {
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
} catch (Exception e) {
// Suppressing the exception here, so that we can attempt for garbage cleanup.
log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
}
} else {
log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
cleanedUpCompletedLease = true;
}
} }
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
@ -256,20 +265,24 @@ public class LeaseCleanupManager {
// 2. Its parent shard lease(s) have already been deleted. // 2. Its parent shard lease(s) have already been deleted.
private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys) private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
final Set<String> processedChildShardLeases = new HashSet<>(); final Set<String> processedChildShardLeaseKeys = new HashSet<>();
final Set<String> childShardLeaseKeys = childShardKeys.stream().map(ck -> ShardInfo.getLeaseKey(shardInfo, ck))
.collect(Collectors.toSet());
for (String childShardKey : childShardKeys) { for (String childShardLeaseKey : childShardLeaseKeys) {
final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow( final Lease childShardLease = Optional.ofNullable(
() -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " + leaseCoordinator.leaseRefresher().getLease(childShardLeaseKey))
"lease table - not cleaning up lease " + lease)); .orElseThrow(() -> new IllegalStateException(
"Child lease " + childShardLeaseKey + " for completed shard not found in "
+ "lease table - not cleaning up lease " + lease));
if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease
&& !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) { .checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
processedChildShardLeases.add(childShardLease.leaseKey()); processedChildShardLeaseKeys.add(childShardLease.leaseKey());
} }
} }
if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) { if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
return false; return false;
} }
@ -320,6 +333,8 @@ public class LeaseCleanupManager {
if (leaseCleanupResult.leaseCleanedUp()) { if (leaseCleanupResult.leaseCleanedUp()) {
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
deletionSucceeded = true; deletionSucceeded = true;
} else {
log.warn("Unable to clean up lease {} for {} due to {}", leaseKey, streamIdentifier, leaseCleanupResult);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +

View file

@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -113,7 +114,7 @@ public class ShutdownTask implements ConsumerTask {
try { try {
try { try {
log.debug("Invoking shutdown() for shard {} with child shards {} , concurrencyToken {}. Shutdown reason: {}", log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
@ -124,34 +125,19 @@ public class ShutdownTask implements ConsumerTask {
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Validate.validState(currentShardLease != null,
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
leaseKeyProvider.apply(shardInfo));
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
currentShardLease, shardInfo); currentShardLease, shardInfo);
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist(); createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease); updateLeaseWithChildShards(currentShardLease);
// Attempt to do shard checkpointing and throw on exception. }
attemptShardEndCheckpointing(scope, startTime);
// Enqueue completed shard for deletion.
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
} else {
// This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required.
// If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint
// or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a
// garbage shard.
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
try {
// Do a best effort shard end checkpointing, before attempting to cleanup the lease,
// in the case of RNF Exception.
attemptShardEndCheckpointing(scope, startTime); attemptShardEndCheckpointing(scope, startTime);
} finally { leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
// Attempt to garbage collect if this shard is no longer associated with the stream.
// If we don't want to cleanup the garbage shard without successful shard end
// checkpointing, remove the try finally construct and only execute the methods.
attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease);
}
}
} }
} else { } else {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
@ -195,29 +181,6 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) {
final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult;
try {
leaseCleanupResult = leaseCleanupManager
.cleanupLease(leasePendingDeletion, false, true);
if (leaseCleanupResult.leaseCleanedUp()) {
log.info("Cleaned up garbage lease {} for {}. Details : {}",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
} else {
log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ",
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
// If we are unable to delete this lease and the reason being RNF, then enqueue it
// for deletion, so that we don't end up consuming service TPS on any bugs.
if (leaseCleanupResult.wasResourceNotFound()) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
} catch (Exception e) {
log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(),
streamIdentifier, e);
}
}
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());