Addressing comments
This commit is contained in:
parent
b49d8ea8cc
commit
a0094b0df8
4 changed files with 72 additions and 65 deletions
|
|
@ -117,34 +117,10 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
try {
|
try {
|
||||||
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
|
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
|
||||||
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
|
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
|
||||||
ShutdownReason localReason = reason;
|
|
||||||
|
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||||
if (localReason == ShutdownReason.SHARD_END) {
|
final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease);
|
||||||
// Create new lease for the child shards if they don't exist.
|
|
||||||
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
|
||||||
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
|
||||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
|
||||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
|
||||||
Validate.validState(currentShardLease != null,
|
|
||||||
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
|
||||||
leaseKeyProvider.apply(shardInfo));
|
|
||||||
try {
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
|
||||||
createLeasesForChildShardsIfNotExist();
|
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
|
||||||
}
|
|
||||||
} catch (InvalidStateException e) {
|
|
||||||
// If invalidStateException happens, it indicates we are missing childShard related information.
|
|
||||||
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
|
|
||||||
// childShard information in the processTask.
|
|
||||||
localReason = ShutdownReason.LEASE_LOST;
|
|
||||||
dropLease();
|
|
||||||
log.warn("Shard " + shardInfo.shardId() + ": Exception happened while shutting down shardConsumer with LEASE_LOST reason. " +
|
|
||||||
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (localReason == ShutdownReason.SHARD_END) {
|
if (localReason == ShutdownReason.SHARD_END) {
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
|
||||||
|
|
@ -192,6 +168,36 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return new TaskResult(exception);
|
return new TaskResult(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease)
|
||||||
|
throws DependencyException, ProvisionedThroughputException {
|
||||||
|
ShutdownReason localReason = originalReason;
|
||||||
|
if (originalReason == ShutdownReason.SHARD_END) {
|
||||||
|
// Create new lease for the child shards if they don't exist.
|
||||||
|
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
|
||||||
|
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
||||||
|
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||||
|
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||||
|
Validate.validState(currentShardLease != null,
|
||||||
|
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
|
||||||
|
leaseKeyProvider.apply(shardInfo));
|
||||||
|
try {
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
createLeasesForChildShardsIfNotExist();
|
||||||
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
|
}
|
||||||
|
} catch (InvalidStateException e) {
|
||||||
|
// If InvalidStateException happens, it indicates we are missing childShard related information.
|
||||||
|
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
|
||||||
|
// childShard information in the processTask.
|
||||||
|
localReason = ShutdownReason.LEASE_LOST;
|
||||||
|
log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " +
|
||||||
|
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e);
|
||||||
|
dropLease(currentShardLease);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return localReason;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
|
||||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
|
||||||
CustomerApplicationException {
|
CustomerApplicationException {
|
||||||
|
|
@ -266,15 +272,13 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
return reason;
|
return reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dropLease() {
|
private void dropLease(Lease currentLease) {
|
||||||
Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
|
||||||
if (currentLease == null) {
|
if (currentLease == null) {
|
||||||
log.warn("Shard " + shardInfo.shardId() + ": Lease already dropped. Will shutdown the shardConsumer directly.");
|
log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo));
|
||||||
return;
|
return;
|
||||||
}
|
} else {
|
||||||
leaseCoordinator.dropLease(currentLease);
|
leaseCoordinator.dropLease(currentLease);
|
||||||
if(currentLease != null) {
|
log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
|
||||||
log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
package software.amazon.kinesis.retrieval;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
|
import software.amazon.awssdk.utils.CollectionUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class DataRetrievalUtil {
|
||||||
|
|
||||||
|
public static boolean isValidResult(String shardEndIndicator, List<ChildShard> childShards) {
|
||||||
|
// shardEndIndicator is nextShardIterator for GetRecordsResponse, and is continuationSequenceNumber for SubscribeToShardEvent
|
||||||
|
// There are two valid scenarios for the shardEndIndicator and childShards combination.
|
||||||
|
// 1. ShardEnd scenario: shardEndIndicator should be null and childShards should be a non-empty list.
|
||||||
|
// 2. Non-ShardEnd scenario: shardEndIndicator should be non-null and childShards should be null or an empty list.
|
||||||
|
// Otherwise, the retrieval result is invalid.
|
||||||
|
if (shardEndIndicator == null && CollectionUtils.isNullOrEmpty(childShards) ||
|
||||||
|
shardEndIndicator != null && !CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For ShardEnd scenario, for each childShard we should validate if parentShards are available.
|
||||||
|
// Missing parentShards can cause issues with creating leases for childShards during ShardConsumer shutdown.
|
||||||
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
|
for (ChildShard childShard : childShards) {
|
||||||
|
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -43,6 +43,7 @@ import software.amazon.kinesis.common.RequestDetails;
|
||||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
||||||
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
|
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
|
||||||
|
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
|
||||||
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
||||||
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
import software.amazon.kinesis.retrieval.KinesisClientRecord;
|
||||||
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
|
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
|
||||||
|
|
@ -485,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
// Since the triggeringFlow is active flow, it will then trigger the handleFlowError call.
|
// Since the triggeringFlow is active flow, it will then trigger the handleFlowError call.
|
||||||
// Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber.
|
// Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber.
|
||||||
// The ShardConsumerSubscriber will finally cancel the subscription.
|
// The ShardConsumerSubscriber will finally cancel the subscription.
|
||||||
if (!isValidEvent(recordBatchEvent)) {
|
if (!DataRetrievalUtil.isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) {
|
||||||
throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid."
|
throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid."
|
||||||
+ " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber()
|
+ " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber()
|
||||||
+ ". event.childShards: " + recordBatchEvent.childShards());
|
+ ". event.childShards: " + recordBatchEvent.childShards());
|
||||||
|
|
@ -511,22 +512,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidEvent(SubscribeToShardEvent event) {
|
|
||||||
if (event.continuationSequenceNumber() == null && CollectionUtils.isNullOrEmpty(event.childShards()) ||
|
|
||||||
event.continuationSequenceNumber() != null && !CollectionUtils.isNullOrEmpty(event.childShards())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(!CollectionUtils.isNullOrEmpty(event.childShards())) {
|
|
||||||
for (ChildShard childShard : event.childShards()) {
|
|
||||||
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
|
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
|
||||||
if (availableQueueSpace <= 0) {
|
if (availableQueueSpace <= 0) {
|
||||||
log.debug(
|
log.debug(
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
||||||
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
|
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
|
||||||
import software.amazon.kinesis.retrieval.DataFetcherResult;
|
import software.amazon.kinesis.retrieval.DataFetcherResult;
|
||||||
|
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
|
||||||
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
import software.amazon.kinesis.retrieval.IteratorBuilder;
|
||||||
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
|
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
|
||||||
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
|
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
|
||||||
|
|
@ -290,7 +291,7 @@ public class KinesisDataFetcher implements DataFetcher {
|
||||||
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
|
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
|
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
|
||||||
maxFutureWait);
|
maxFutureWait);
|
||||||
if (!isValidResponse(response)) {
|
if (!DataRetrievalUtil.isValidResult(response.nextShardIterator(), response.childShards())) {
|
||||||
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
|
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
|
||||||
+ ". nextShardIterator: " + response.nextShardIterator()
|
+ ". nextShardIterator: " + response.nextShardIterator()
|
||||||
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
|
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
|
||||||
|
|
@ -340,21 +341,6 @@ public class KinesisDataFetcher implements DataFetcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidResponse(GetRecordsResponse response) {
|
|
||||||
if (response.nextShardIterator() == null && CollectionUtils.isNullOrEmpty(response.childShards()) ||
|
|
||||||
response.nextShardIterator() != null && !CollectionUtils.isNullOrEmpty(response.childShards())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!CollectionUtils.isNullOrEmpty(response.childShards())) {
|
|
||||||
for (ChildShard childShard : response.childShards()) {
|
|
||||||
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private AWSExceptionManager createExceptionManager() {
|
private AWSExceptionManager createExceptionManager() {
|
||||||
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
|
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
|
||||||
exceptionManager.add(ResourceNotFoundException.class, t -> t);
|
exceptionManager.add(ResourceNotFoundException.class, t -> t);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue