ChildShard validation in Data Fetcher level and error handling in shutdownTask
This commit is contained in:
parent
74ffd4060c
commit
b49d8ea8cc
4 changed files with 72 additions and 26 deletions
|
|
@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||||
import software.amazon.kinesis.leases.ShardDetector;
|
import software.amazon.kinesis.leases.ShardDetector;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
|
import software.amazon.kinesis.leases.UpdateField;
|
||||||
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||||
|
|
@ -116,25 +117,37 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
|
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
|
||||||
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
|
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
|
||||||
|
ShutdownReason localReason = reason;
|
||||||
|
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
if (reason == ShutdownReason.SHARD_END) {
|
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||||
|
if (localReason == ShutdownReason.SHARD_END) {
|
||||||
// Create new lease for the child shards if they don't exist.
|
// Create new lease for the child shards if they don't exist.
|
||||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
||||||
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
||||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||||
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
|
||||||
Validate.validState(currentShardLease != null,
|
Validate.validState(currentShardLease != null,
|
||||||
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
||||||
leaseKeyProvider.apply(shardInfo));
|
leaseKeyProvider.apply(shardInfo));
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
|
try {
|
||||||
currentShardLease, shardInfo);
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
createLeasesForChildShardsIfNotExist();
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
createLeasesForChildShardsIfNotExist();
|
}
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
} catch (InvalidStateException e) {
|
||||||
|
// If invalidStateException happens, it indicates we are missing childShard related information.
|
||||||
|
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
|
||||||
|
// childShard information in the processTask.
|
||||||
|
localReason = ShutdownReason.LEASE_LOST;
|
||||||
|
dropLease();
|
||||||
|
log.warn("Shard " + shardInfo.shardId() + ": Exception happened while shutting down shardConsumer with LEASE_LOST reason. " +
|
||||||
|
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (localReason == ShutdownReason.SHARD_END) {
|
||||||
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
|
||||||
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
||||||
boolean isSuccess = false;
|
boolean isSuccess = false;
|
||||||
try {
|
try {
|
||||||
|
|
@ -234,11 +247,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
|
|
||||||
final Lease updatedLease = currentLease.copy();
|
final Lease updatedLease = currentLease.copy();
|
||||||
updatedLease.childShardIds(childShardIds);
|
updatedLease.childShardIds(childShardIds);
|
||||||
// TODO : Make changes to use the new leaserefresher#updateLease(Lease lease, UpdateField updateField)
|
leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
|
||||||
final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
|
|
||||||
if (!updateResult) {
|
|
||||||
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId());
|
|
||||||
}
|
|
||||||
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
|
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -257,4 +266,15 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return reason;
|
return reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void dropLease() {
|
||||||
|
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||||
|
if (currentLease == null) {
|
||||||
|
log.warn("Shard " + shardInfo.shardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
leaseCoordinator.dropLease(currentLease);
|
||||||
|
if(currentLease != null) {
|
||||||
|
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import org.reactivestreams.Subscriber;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
||||||
|
|
@ -511,8 +512,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidEvent(SubscribeToShardEvent event) {
|
private boolean isValidEvent(SubscribeToShardEvent event) {
|
||||||
return event.continuationSequenceNumber() == null ? !CollectionUtils.isNullOrEmpty(event.childShards())
|
if (event.continuationSequenceNumber() == null && CollectionUtils.isNullOrEmpty(event.childShards()) ||
|
||||||
: event.childShards() != null && event.childShards().isEmpty();
|
event.continuationSequenceNumber() != null && !CollectionUtils.isNullOrEmpty(event.childShards())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!CollectionUtils.isNullOrEmpty(event.childShards())) {
|
||||||
|
for (ChildShard childShard : event.childShards()) {
|
||||||
|
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
|
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import software.amazon.awssdk.core.exception.SdkException;
|
import software.amazon.awssdk.core.exception.SdkException;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
|
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||||
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
|
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
|
||||||
|
|
@ -340,8 +341,18 @@ public class KinesisDataFetcher implements DataFetcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidResponse(GetRecordsResponse response) {
|
private boolean isValidResponse(GetRecordsResponse response) {
|
||||||
return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards())
|
if (response.nextShardIterator() == null && CollectionUtils.isNullOrEmpty(response.childShards()) ||
|
||||||
: response.childShards() != null && response.childShards().isEmpty();
|
response.nextShardIterator() != null && !CollectionUtils.isNullOrEmpty(response.childShards())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(response.childShards())) {
|
||||||
|
for (ChildShard childShard : response.childShards()) {
|
||||||
|
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private AWSExceptionManager createExceptionManager() {
|
private AWSExceptionManager createExceptionManager() {
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,7 @@ import software.amazon.kinesis.leases.LeaseRefresher;
|
||||||
import software.amazon.kinesis.leases.ShardDetector;
|
import software.amazon.kinesis.leases.ShardDetector;
|
||||||
import software.amazon.kinesis.leases.ShardInfo;
|
import software.amazon.kinesis.leases.ShardInfo;
|
||||||
import software.amazon.kinesis.leases.ShardObjectHelper;
|
import software.amazon.kinesis.leases.ShardObjectHelper;
|
||||||
|
import software.amazon.kinesis.leases.UpdateField;
|
||||||
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||||
|
|
@ -148,16 +149,18 @@ public class ShutdownTaskTest {
|
||||||
@Test
|
@Test
|
||||||
public final void testCallWhenCreatingNewLeasesThrows() throws Exception {
|
public final void testCallWhenCreatingNewLeasesThrows() throws Exception {
|
||||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
|
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||||
|
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||||
when(leaseRefresher.createLeaseIfNotExists(Matchers.any(Lease.class))).thenThrow(new KinesisClientLibIOException("KinesisClientLibIOException"));
|
when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
|
||||||
|
.thenThrow(new InvalidStateException("InvalidStateException is thrown"));
|
||||||
|
|
||||||
final TaskResult result = task.call();
|
final TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNull(result.getException());
|
||||||
assertTrue(result.getException() instanceof IllegalStateException);
|
verify(recordsPublisher).shutdown();
|
||||||
verify(recordsPublisher, never()).shutdown();
|
|
||||||
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
||||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -185,7 +188,7 @@ public class ShutdownTaskTest {
|
||||||
verify(recordsPublisher).shutdown();
|
verify(recordsPublisher).shutdown();
|
||||||
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
||||||
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue