Shutdown task to throw retryable exception intermittently to allow parent shard lease detection in merge cases.

This commit is contained in:
Ashwin Giridharan 2020-07-15 10:08:50 -07:00
parent d9f5557ff5
commit 264e5bd4aa
2 changed files with 128 additions and 63 deletions

View file

@ -25,11 +25,11 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
@ -53,7 +53,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -100,6 +99,7 @@ public class ShutdownTask implements ConsumerTask {
private final LeaseCleanupManager leaseCleanupManager;
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
private int retryLeftForValidParentState = 10;
/*
* Invokes ShardRecordProcessor shutdown() API.
@ -121,26 +121,21 @@ public class ShutdownTask implements ConsumerTask {
final long startTime = System.currentTimeMillis();
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease);
final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
if (localReason == ShutdownReason.SHARD_END) {
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false;
try {
isSuccess = attemptShardEndCheckpointing(scope, startTime);
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
if (reason == ShutdownReason.SHARD_END) {
try {
takeShardEndAction(currentShardLease, scope, startTime);
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down.
log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " +
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", currentShardLease.leaseKey(), e);
dropLease(currentShardLease);
throwOnApplicationException(leaseLostAction, scope, startTime);
}
} else {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
throwOnApplicationException(leaseLostAction, scope, startTime);
}
log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo));
@ -169,41 +164,47 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception);
}
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease)
throws DependencyException, ProvisionedThroughputException {
ShutdownReason localReason = originalReason;
if (originalReason == ShutdownReason.SHARD_END) {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
Validate.validState(currentShardLease != null,
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
leaseKeyProvider.apply(shardInfo));
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(Lease currentShardLease,
MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) {
throw new InvalidStateException(leaseKeyProvider.apply(shardInfo)
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
}
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease);
}
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false;
try {
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease);
isSuccess = attemptShardEndCheckpointing(scope, startTime);
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we are missing childShard related information.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
// childShard information in the processTask.
localReason = ShutdownReason.LEASE_LOST;
log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " +
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e);
dropLease(currentShardLease);
}
}
return localReason;
}
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
.orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
@ -252,8 +253,15 @@ public class ShutdownTask implements ConsumerTask {
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
if (!isValidLeaseTableState) {
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table.");
if (--retryLeftForValidParentState >= 0) {
throw new BlockedOnParentShardException(
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
} else {
throw new InvalidStateException(
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
}
}
}
}

View file

@ -44,6 +44,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
@ -197,7 +198,7 @@ public class ShutdownTaskTest {
}
@Test
public final void testCallThrowsWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
@ -209,37 +210,93 @@ public class ShutdownTaskTest {
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")).thenReturn(null, parentLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
.thenReturn(null, null, null, null, null, parentLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
// Return null lease first time to simulate partial parent lease info
when(leaseRefresher.getLease("shardId-1")).thenReturn(null, parentLease);
when(leaseRefresher.getLease("shardId-1"))
.thenReturn(null, null, null, null, null, parentLease);
// Make first attempt with partial parent info in lease table
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof InvalidStateException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never()).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
// Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never())
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never())
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
}
// make next attempt with complete parent info in lease table
result = task.call();
TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
}
@Test
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
// Return null lease first time to simulate partial parent lease info
when(leaseRefresher.getLease("shardId-1"))
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
// Make first 10 attempts with partial parent info in lease table
for (int i = 0; i < 10; i++) {
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never())
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never())
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
}
// make final attempt with incomplete parent info in lease table
TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
}
/**