Merge pull request #68 from ychunxue/childShardValidation

ChildShard validation in Data Fetcher level and error handling in shu…
This commit is contained in:
ashwing 2020-07-14 16:33:53 -07:00 committed by GitHub
commit 5fc55e0c8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 40 deletions

View file

@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
@ -118,23 +119,11 @@ public class ShutdownTask implements ConsumerTask {
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
if (reason == ShutdownReason.SHARD_END) { final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
// Create new lease for the child shards if they don't exist. final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease);
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Validate.validState(currentShardLease != null,
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
leaseKeyProvider.apply(shardInfo));
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier,
currentShardLease, shardInfo);
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (localReason == ShutdownReason.SHARD_END) {
createLeasesForChildShardsIfNotExist(); final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
updateLeaseWithChildShards(currentShardLease);
}
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false; boolean isSuccess = false;
try { try {
@ -179,6 +168,36 @@ public class ShutdownTask implements ConsumerTask {
return new TaskResult(exception); return new TaskResult(exception);
} }
private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease)
throws DependencyException, ProvisionedThroughputException {
ShutdownReason localReason = originalReason;
if (originalReason == ShutdownReason.SHARD_END) {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
Validate.validState(currentShardLease != null,
"%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.",
leaseKeyProvider.apply(shardInfo));
try {
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards(currentShardLease);
}
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we are missing childShard related information.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting
// childShard information in the processTask.
localReason = ShutdownReason.LEASE_LOST;
log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " +
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e);
dropLease(currentShardLease);
}
}
return localReason;
}
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException { CustomerApplicationException {
@ -234,11 +253,7 @@ public class ShutdownTask implements ConsumerTask {
final Lease updatedLease = currentLease.copy(); final Lease updatedLease = currentLease.copy();
updatedLease.childShardIds(childShardIds); updatedLease.childShardIds(childShardIds);
// TODO : Make changes to use the new leaserefresher#updateLease(Lease lease, UpdateField updateField) leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo));
if (!updateResult) {
throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId());
}
log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds); log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds);
} }
@ -257,4 +272,13 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private void dropLease(Lease currentLease) {
if (currentLease == null) {
log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo));
return;
} else {
leaseCoordinator.dropLease(currentLease);
log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
}
}
} }

View file

@ -0,0 +1,47 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.utils.CollectionUtils;
import java.util.List;
public class DataRetrievalUtil {
public static boolean isValidResult(String shardEndIndicator, List<ChildShard> childShards) {
// shardEndIndicator is nextShardIterator for GetRecordsResponse, and is continuationSequenceNumber for SubscribeToShardEvent
// There are two valid scenarios for the shardEndIndicator and childShards combination.
// 1. ShardEnd scenario: shardEndIndicator should be null and childShards should be a non-empty list.
// 2. Non-ShardEnd scenario: shardEndIndicator should be non-null and childShards should be null or an empty list.
// Otherwise, the retrieval result is invalid.
if (shardEndIndicator == null && CollectionUtils.isNullOrEmpty(childShards) ||
shardEndIndicator != null && !CollectionUtils.isNullOrEmpty(childShards)) {
return false;
}
// For ShardEnd scenario, for each childShard we should validate if parentShards are available.
// Missing parentShards can cause issues with creating leases for childShards during ShardConsumer shutdown.
if (!CollectionUtils.isNullOrEmpty(childShards)) {
for (ChildShard childShard : childShards) {
if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) {
return false;
}
}
}
return true;
}
}

View file

@ -27,6 +27,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
@ -484,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Since the triggeringFlow is active flow, it will then trigger the handleFlowError call. // Since the triggeringFlow is active flow, it will then trigger the handleFlowError call.
// Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber. // Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber.
// The ShardConsumerSubscriber will finally cancel the subscription. // The ShardConsumerSubscriber will finally cancel the subscription.
if (!isValidEvent(recordBatchEvent)) { if (!isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) {
throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid." throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid."
+ " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber() + " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber()
+ ". event.childShards: " + recordBatchEvent.childShards()); + ". event.childShards: " + recordBatchEvent.childShards());
@ -510,11 +512,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
} }
} }
private boolean isValidEvent(SubscribeToShardEvent event) {
return event.continuationSequenceNumber() == null ? !CollectionUtils.isNullOrEmpty(event.childShards())
: event.childShards() != null && event.childShards().isEmpty();
}
private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) {
if (availableQueueSpace <= 0) { if (availableQueueSpace <= 0) {
log.debug( log.debug(

View file

@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@ -47,11 +48,14 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
/** /**
* Used to get data from Amazon Kinesis. Tracks iterator state internally. * Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/ */
@ -289,7 +293,7 @@ public class KinesisDataFetcher implements DataFetcher {
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait); maxFutureWait);
if (!isValidResponse(response)) { if (!isValidResult(response.nextShardIterator(), response.childShards())) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator() + ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");
@ -339,11 +343,6 @@ public class KinesisDataFetcher implements DataFetcher {
} }
} }
private boolean isValidResponse(GetRecordsResponse response) {
return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards())
: response.childShards() != null && response.childShards().isEmpty();
}
private AWSExceptionManager createExceptionManager() { private AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager(); final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t); exceptionManager.add(ResourceNotFoundException.class, t -> t);

View file

@ -52,6 +52,7 @@ import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper; import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
@ -148,16 +149,18 @@ public class ShutdownTaskTest {
@Test @Test
public final void testCallWhenCreatingNewLeasesThrows() throws Exception { public final void testCallWhenCreatingNewLeasesThrows() throws Exception {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseRefresher.createLeaseIfNotExists(Matchers.any(Lease.class))).thenThrow(new KinesisClientLibIOException("KinesisClientLibIOException")); when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
.thenThrow(new InvalidStateException("InvalidStateException is thrown"));
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNull(result.getException());
assertTrue(result.getException() instanceof IllegalStateException); verify(recordsPublisher).shutdown();
verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
} }
/** /**
@ -185,7 +188,7 @@ public class ShutdownTaskTest {
verify(recordsPublisher).shutdown(); verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));