Merge pull request #64 from ashwing/ltr_1_leader_stream_sync
Ltr 1 leader stream sync
This commit is contained in:
commit
1b86184428
3 changed files with 65 additions and 82 deletions
|
|
@ -423,7 +423,9 @@ public class Scheduler implements Runnable {
|
||||||
cleanupShardConsumers(assignedShards);
|
cleanupShardConsumers(assignedShards);
|
||||||
|
|
||||||
// check for new streams and sync with the scheduler state
|
// check for new streams and sync with the scheduler state
|
||||||
checkAndSyncStreamShardsAndLeases();
|
if (isLeader()) {
|
||||||
|
checkAndSyncStreamShardsAndLeases();
|
||||||
|
}
|
||||||
|
|
||||||
logExecutorState();
|
logExecutorState();
|
||||||
slog.info("Sleeping ...");
|
slog.info("Sleeping ...");
|
||||||
|
|
@ -440,6 +442,10 @@ public class Scheduler implements Runnable {
|
||||||
slog.resetInfoLogging();
|
slog.resetInfoLogging();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isLeader() {
|
||||||
|
return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note: This method has package level access solely for testing purposes.
|
* Note: This method has package level access solely for testing purposes.
|
||||||
|
|
@ -526,15 +532,14 @@ public class Scheduler implements Runnable {
|
||||||
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
|
||||||
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
|
||||||
log.info(
|
log.info(
|
||||||
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams."
|
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
|
||||||
+ streamIdentifier);
|
|
||||||
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
|
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
|
||||||
currentStreamConfigMap.get(streamIdentifier));
|
currentStreamConfigMap.get(streamIdentifier));
|
||||||
shardSyncTaskManager.submitShardSyncTask();
|
shardSyncTaskManager.submitShardSyncTask();
|
||||||
} else {
|
} else {
|
||||||
log.info(
|
log.info(
|
||||||
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
|
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
|
||||||
+ " as part of this workflow" + streamIdentifier);
|
+ " as part of this workflow", streamIdentifier);
|
||||||
}
|
}
|
||||||
currentSetOfStreamsIter.remove();
|
currentSetOfStreamsIter.remove();
|
||||||
streamsSynced.add(streamIdentifier);
|
streamsSynced.add(streamIdentifier);
|
||||||
|
|
|
||||||
|
|
@ -114,20 +114,16 @@ public class LeaseCleanupManager {
|
||||||
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
|
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
|
||||||
lease.leaseKey());
|
lease.leaseKey());
|
||||||
} else {
|
} else {
|
||||||
//TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
|
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
|
||||||
if (!deletionQueue.contains(leasePendingDeletion)) {
|
if (!deletionQueue.add(leasePendingDeletion)) {
|
||||||
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
|
log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey());
|
||||||
if (!deletionQueue.add(leasePendingDeletion)) {
|
|
||||||
log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if lease was already enqueued for deletion.
|
* Check if lease was already enqueued for deletion.
|
||||||
|
* //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
|
||||||
* @param leasePendingDeletion
|
* @param leasePendingDeletion
|
||||||
* @return true if enqueued for deletion; false otherwise.
|
* @return true if enqueued for deletion; false otherwise.
|
||||||
*/
|
*/
|
||||||
|
|
@ -168,26 +164,39 @@ public class LeaseCleanupManager {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
|
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
|
||||||
Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds();
|
final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
|
||||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
if(leaseFromDDB != null) {
|
||||||
try {
|
Set<String> childShardKeys = leaseFromDDB.childShardIds();
|
||||||
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
|
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||||
|
try {
|
||||||
|
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
|
||||||
|
|
||||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||||
log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName());
|
log.error(
|
||||||
} else {
|
"No child shards returned from service for shard {} for {} while cleaning up lease.",
|
||||||
wereChildShardsPresent = true;
|
shardInfo.shardId(), streamIdentifier.streamName());
|
||||||
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
|
} else {
|
||||||
|
wereChildShardsPresent = true;
|
||||||
|
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
|
||||||
|
}
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw exceptionManager.apply(e.getCause());
|
||||||
|
} finally {
|
||||||
|
alreadyCheckedForGarbageCollection = true;
|
||||||
}
|
}
|
||||||
} catch (ExecutionException e) {
|
} else {
|
||||||
throw exceptionManager.apply(e.getCause());
|
wereChildShardsPresent = true;
|
||||||
} finally {
|
}
|
||||||
alreadyCheckedForGarbageCollection = true;
|
try {
|
||||||
|
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Suppressing the exception here, so that we can attempt for garbage cleanup.
|
||||||
|
log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
wereChildShardsPresent = true;
|
log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
|
||||||
|
cleanedUpCompletedLease = true;
|
||||||
}
|
}
|
||||||
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
|
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
|
||||||
|
|
@ -256,20 +265,24 @@ public class LeaseCleanupManager {
|
||||||
// 2. Its parent shard lease(s) have already been deleted.
|
// 2. Its parent shard lease(s) have already been deleted.
|
||||||
private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys)
|
private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys)
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
|
||||||
final Set<String> processedChildShardLeases = new HashSet<>();
|
final Set<String> processedChildShardLeaseKeys = new HashSet<>();
|
||||||
|
final Set<String> childShardLeaseKeys = childShardKeys.stream().map(ck -> ShardInfo.getLeaseKey(shardInfo, ck))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
for (String childShardKey : childShardKeys) {
|
for (String childShardLeaseKey : childShardLeaseKeys) {
|
||||||
final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow(
|
final Lease childShardLease = Optional.ofNullable(
|
||||||
() -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " +
|
leaseCoordinator.leaseRefresher().getLease(childShardLeaseKey))
|
||||||
"lease table - not cleaning up lease " + lease));
|
.orElseThrow(() -> new IllegalStateException(
|
||||||
|
"Child lease " + childShardLeaseKey + " for completed shard not found in "
|
||||||
|
+ "lease table - not cleaning up lease " + lease));
|
||||||
|
|
||||||
if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)
|
if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease
|
||||||
&& !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
|
.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
|
||||||
processedChildShardLeases.add(childShardLease.leaseKey());
|
processedChildShardLeaseKeys.add(childShardLease.leaseKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) {
|
if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,6 +333,8 @@ public class LeaseCleanupManager {
|
||||||
if (leaseCleanupResult.leaseCleanedUp()) {
|
if (leaseCleanupResult.leaseCleanedUp()) {
|
||||||
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
|
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
|
||||||
deletionSucceeded = true;
|
deletionSucceeded = true;
|
||||||
|
} else {
|
||||||
|
log.warn("Unable to clean up lease {} for {} due to {}", leaseKey, streamIdentifier, leaseCleanupResult);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
|
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
import software.amazon.awssdk.utils.CollectionUtils;
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
|
import software.amazon.awssdk.utils.Validate;
|
||||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
|
|
@ -113,8 +114,8 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}",
|
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
|
||||||
leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason);
|
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
|
||||||
|
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
if (reason == ShutdownReason.SHARD_END) {
|
if (reason == ShutdownReason.SHARD_END) {
|
||||||
|
|
@ -124,34 +125,19 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||||
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||||
|
Validate.validState(currentShardLease != null,
|
||||||
|
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
||||||
|
leaseKeyProvider.apply(shardInfo));
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
|
||||||
currentShardLease, shardInfo);
|
currentShardLease, shardInfo);
|
||||||
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
createLeasesForChildShardsIfNotExist();
|
createLeasesForChildShardsIfNotExist();
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
// Attempt to do shard checkpointing and throw on exception.
|
}
|
||||||
|
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
||||||
attemptShardEndCheckpointing(scope, startTime);
|
attemptShardEndCheckpointing(scope, startTime);
|
||||||
// Enqueue completed shard for deletion.
|
|
||||||
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
||||||
|
|
||||||
} else {
|
|
||||||
// This might be a case of ResourceNotFound from Service. Directly validate and delete lease, if required.
|
|
||||||
// If already enqueued for deletion as part of this worker, do not attempt to shard end checkpoint
|
|
||||||
// or lease cleanup. Else try to shard end checkpoint and cleanup the lease if the shard is a
|
|
||||||
// garbage shard.
|
|
||||||
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
|
||||||
try {
|
|
||||||
// Do a best effort shard end checkpointing, before attempting to cleanup the lease,
|
|
||||||
// in the case of RNF Exception.
|
|
||||||
attemptShardEndCheckpointing(scope, startTime);
|
|
||||||
} finally {
|
|
||||||
// Attempt to garbage collect if this shard is no longer associated with the stream.
|
|
||||||
// If we don't want to cleanup the garbage shard without successful shard end
|
|
||||||
// checkpointing, remove the try finally construct and only execute the methods.
|
|
||||||
attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(leasePendingDeletion, currentShardLease);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
||||||
|
|
@ -195,29 +181,6 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void attemptGarbageCollectionOfLeaseAndEnqueueOnFailure(LeasePendingDeletion leasePendingDeletion, Lease currentShardLease) {
|
|
||||||
final LeaseCleanupManager.LeaseCleanupResult leaseCleanupResult;
|
|
||||||
try {
|
|
||||||
leaseCleanupResult = leaseCleanupManager
|
|
||||||
.cleanupLease(leasePendingDeletion, false, true);
|
|
||||||
if (leaseCleanupResult.leaseCleanedUp()) {
|
|
||||||
log.info("Cleaned up garbage lease {} for {}. Details : {}",
|
|
||||||
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
|
|
||||||
} else {
|
|
||||||
log.error("Unable to cleanup potential garbage lease {} for {}. Details : {} ",
|
|
||||||
currentShardLease.leaseKey(), streamIdentifier, leaseCleanupResult);
|
|
||||||
// If we are unable to delete this lease and the reason being RNF, then enqueue it
|
|
||||||
// for deletion, so that we don't end up consuming service TPS on any bugs.
|
|
||||||
if (leaseCleanupResult.wasResourceNotFound()) {
|
|
||||||
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Unable to cleanup potential garbage lease {} for {}", currentShardLease.leaseKey(),
|
|
||||||
streamIdentifier, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void applicationCheckpointAndVerification() {
|
private void applicationCheckpointAndVerification() {
|
||||||
recordProcessorCheckpointer
|
recordProcessorCheckpointer
|
||||||
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
|
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue