From b49d8ea8cc0f1025fb4e4191dfb5d2343b1a1208 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 7 Jul 2020 16:02:12 -0700 Subject: [PATCH 1/3] ChildShard validation in Data Fetcher level and error handling in shutdownTask --- .../kinesis/lifecycle/ShutdownTask.java | 50 +++++++++++++------ .../fanout/FanOutRecordsPublisher.java | 16 +++++- .../retrieval/polling/KinesisDataFetcher.java | 15 +++++- .../kinesis/lifecycle/ShutdownTaskTest.java | 17 ++++--- 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 91ea125b..800aa4c7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -35,6 +35,7 @@ import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; @@ -116,25 +117,37 @@ public class ShutdownTask implements ConsumerTask { try { log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); + ShutdownReason localReason = reason; final long startTime = System.currentTimeMillis(); - if (reason == ShutdownReason.SHARD_END) { + final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + if (localReason == ShutdownReason.SHARD_END) { // Create new lease for the child shards if they don't exist. // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. - final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); Validate.validState(currentShardLease != null, - "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", - leaseKeyProvider.apply(shardInfo)); - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, - currentShardLease, shardInfo); - - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateLeaseWithChildShards(currentShardLease); + "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", + leaseKeyProvider.apply(shardInfo)); + try { + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateLeaseWithChildShards(currentShardLease); + } + } catch (InvalidStateException e) { + // If invalidStateException happens, it indicates we are missing childShard related information. + // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting + // childShard information in the processTask. + localReason = ShutdownReason.LEASE_LOST; + dropLease(); + log.warn("Shard " + shardInfo.shardId() + ": Exception happened while shutting down shardConsumer with LEASE_LOST reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); } + } + + if (localReason == ShutdownReason.SHARD_END) { + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { @@ -234,11 +247,7 @@ public class ShutdownTask implements ConsumerTask { final Lease updatedLease = currentLease.copy(); updatedLease.childShardIds(childShardIds); - // TODO : Make changes to use the new leaserefresher#updateLease(Lease lease, UpdateField updateField) - final boolean updateResult = leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); - if (!updateResult) { - throw new InvalidStateException("Failed to update parent lease with child shard information for shard " + shardInfo.shardId()); - } + leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS); log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds); } @@ -257,4 +266,15 @@ public class ShutdownTask implements ConsumerTask { return reason; } + private void dropLease() { + Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + if (currentLease == null) { + log.warn("Shard " + shardInfo.shardId() + ": Lease already dropped. Will shutdown the shardConsumer directly."); + return; + } + leaseCoordinator.dropLease(currentLease); + if(currentLease != null) { + log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); + } + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 38075890..99a7e9a4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -27,6 +27,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; @@ -511,8 +512,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } private boolean isValidEvent(SubscribeToShardEvent event) { - return event.continuationSequenceNumber() == null ? !CollectionUtils.isNullOrEmpty(event.childShards()) - : event.childShards() != null && event.childShards().isEmpty(); + if (event.continuationSequenceNumber() == null && CollectionUtils.isNullOrEmpty(event.childShards()) || + event.continuationSequenceNumber() != null && !CollectionUtils.isNullOrEmpty(event.childShards())) { + return false; + } + + if(!CollectionUtils.isNullOrEmpty(event.childShards())) { + for (ChildShard childShard : event.childShards()) { + if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) { + return false; + } + } + } + return true; } private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 3af3dcf5..c1bb7d7e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; @@ -340,8 +341,18 @@ public class KinesisDataFetcher implements DataFetcher { } private boolean isValidResponse(GetRecordsResponse response) { - return response.nextShardIterator() == null ? !CollectionUtils.isNullOrEmpty(response.childShards()) - : response.childShards() != null && response.childShards().isEmpty(); + if (response.nextShardIterator() == null && CollectionUtils.isNullOrEmpty(response.childShards()) || + response.nextShardIterator() != null && !CollectionUtils.isNullOrEmpty(response.childShards())) { + return false; + } + if (!CollectionUtils.isNullOrEmpty(response.childShards())) { + for (ChildShard childShard : response.childShards()) { + if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) { + return false; + } + } + } + return true; } private AWSExceptionManager createExceptionManager() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index d5af6627..792d566a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -52,6 +52,7 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; @@ -148,16 +149,18 @@ public class ShutdownTaskTest { @Test public final void testCallWhenCreatingNewLeasesThrows() throws Exception { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseRefresher.createLeaseIfNotExists(Matchers.any(Lease.class))).thenThrow(new KinesisClientLibIOException("KinesisClientLibIOException")); + when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) + .thenThrow(new InvalidStateException("InvalidStateException is thrown")); final TaskResult result = task.call(); - assertNotNull(result.getException()); - assertTrue(result.getException() instanceof IllegalStateException); - verify(recordsPublisher, never()).shutdown(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); } /** @@ -185,7 +188,7 @@ public class ShutdownTaskTest { verify(recordsPublisher).shutdown(); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); + verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); From a0094b0df8c9852eb0102c285bae48a18accc16c Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Wed, 8 Jul 2020 12:33:16 -0700 Subject: [PATCH 2/3] Addressing comments --- .../kinesis/lifecycle/ShutdownTask.java | 68 ++++++++++--------- .../kinesis/retrieval/DataRetrievalUtil.java | 32 +++++++++ .../fanout/FanOutRecordsPublisher.java | 19 +----- .../retrieval/polling/KinesisDataFetcher.java | 18 +---- 4 files changed, 72 insertions(+), 65 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 800aa4c7..0c01f13f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -117,34 +117,10 @@ public class ShutdownTask implements ConsumerTask { try { log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); - ShutdownReason localReason = reason; final long startTime = System.currentTimeMillis(); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); - if (localReason == ShutdownReason.SHARD_END) { - // Create new lease for the child shards if they don't exist. - // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. - // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. - // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. - // This scenario could happen when customer deletes the stream while leaving the KCL application running. - Validate.validState(currentShardLease != null, - "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", - leaseKeyProvider.apply(shardInfo)); - try { - if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); - updateLeaseWithChildShards(currentShardLease); - } - } catch (InvalidStateException e) { - // If invalidStateException happens, it indicates we are missing childShard related information. - // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting - // childShard information in the processTask. - localReason = ShutdownReason.LEASE_LOST; - dropLease(); - log.warn("Shard " + shardInfo.shardId() + ": Exception happened while shutting down shardConsumer with LEASE_LOST reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. Exception: ", e); - } - } + final ShutdownReason localReason = attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(reason, currentShardLease); if (localReason == ShutdownReason.SHARD_END) { final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); @@ -192,6 +168,36 @@ public class ShutdownTask implements ConsumerTask { return new TaskResult(exception); } + private ShutdownReason attemptPersistingChildShardInfoAndOverrideShutdownReasonOnFailure(ShutdownReason originalReason, Lease currentShardLease) + throws DependencyException, ProvisionedThroughputException { + ShutdownReason localReason = originalReason; + if (originalReason == ShutdownReason.SHARD_END) { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. + // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + Validate.validState(currentShardLease != null, + "%s : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.", + leaseKeyProvider.apply(shardInfo)); + try { + if (!CollectionUtils.isNullOrEmpty(childShards)) { + createLeasesForChildShardsIfNotExist(); + updateLeaseWithChildShards(currentShardLease); + } + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we are missing childShard related information. + // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry getting + // childShard information in the processTask. + localReason = ShutdownReason.LEASE_LOST; + log.warn("Lease {}: Exception happened while shutting down shardConsumer with SHARD_END reason. " + + "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. Exception: ", currentShardLease.leaseKey(), e); + dropLease(currentShardLease); + } + } + return localReason; + } + private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { @@ -266,15 +272,13 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private void dropLease() { - Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + private void dropLease(Lease currentLease) { if (currentLease == null) { - log.warn("Shard " + shardInfo.shardId() + ": Lease already dropped. Will shutdown the shardConsumer directly."); + log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo)); return; - } - leaseCoordinator.dropLease(currentLease); - if(currentLease != null) { - log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); + } else { + leaseCoordinator.dropLease(currentLease); + log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java new file mode 100644 index 00000000..35ad2de6 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java @@ -0,0 +1,32 @@ +package software.amazon.kinesis.retrieval; + +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.utils.CollectionUtils; + +import java.util.List; + +public class DataRetrievalUtil { + + public static boolean isValidResult(String shardEndIndicator, List childShards) { + // shardEndIndicator is nextShardIterator for GetRecordsResponse, and is continuationSequenceNumber for SubscribeToShardEvent + // There are two valid scenarios for the shardEndIndicator and childShards combination. + // 1. ShardEnd scenario: shardEndIndicator should be null and childShards should be a non-empty list. + // 2. Non-ShardEnd scenario: shardEndIndicator should be non-null and childShards should be null or an empty list. + // Otherwise, the retrieval result is invalid. + if (shardEndIndicator == null && CollectionUtils.isNullOrEmpty(childShards) || + shardEndIndicator != null && !CollectionUtils.isNullOrEmpty(childShards)) { + return false; + } + + // For ShardEnd scenario, for each childShard we should validate if parentShards are available. + // Missing parentShards can cause issues with creating leases for childShards during ShardConsumer shutdown. + if (!CollectionUtils.isNullOrEmpty(childShards)) { + for (ChildShard childShard : childShards) { + if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) { + return false; + } + } + } + return true; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 99a7e9a4..b6299bbf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -43,6 +43,7 @@ import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; +import software.amazon.kinesis.retrieval.DataRetrievalUtil; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @@ -485,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Since the triggeringFlow is active flow, it will then trigger the handleFlowError call. // Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber. // The ShardConsumerSubscriber will finally cancel the subscription. - if (!isValidEvent(recordBatchEvent)) { + if (!DataRetrievalUtil.isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) { throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid." + " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber() + ". event.childShards: " + recordBatchEvent.childShards()); @@ -511,22 +512,6 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } - private boolean isValidEvent(SubscribeToShardEvent event) { - if (event.continuationSequenceNumber() == null && CollectionUtils.isNullOrEmpty(event.childShards()) || - event.continuationSequenceNumber() != null && !CollectionUtils.isNullOrEmpty(event.childShards())) { - return false; - } - - if(!CollectionUtils.isNullOrEmpty(event.childShards())) { - for (ChildShard childShard : event.childShards()) { - if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) { - return false; - } - } - } - return true; - } - private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFlow) { if (availableQueueSpace <= 0) { log.debug( diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index c1bb7d7e..3584dbf8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -48,6 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.DataRetrievalUtil; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; @@ -290,7 +291,7 @@ public class KinesisDataFetcher implements DataFetcher { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); - if (!isValidResponse(response)) { + if (!DataRetrievalUtil.isValidResult(response.nextShardIterator(), response.childShards())) { throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + ". nextShardIterator: " + response.nextShardIterator() + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); @@ -340,21 +341,6 @@ public class KinesisDataFetcher implements DataFetcher { } } - private boolean isValidResponse(GetRecordsResponse response) { - if (response.nextShardIterator() == null && CollectionUtils.isNullOrEmpty(response.childShards()) || - response.nextShardIterator() != null && !CollectionUtils.isNullOrEmpty(response.childShards())) { - return false; - } - if (!CollectionUtils.isNullOrEmpty(response.childShards())) { - for (ChildShard childShard : response.childShards()) { - if (CollectionUtils.isNullOrEmpty(childShard.parentShards())) { - return false; - } - } - } - return true; - } - private AWSExceptionManager createExceptionManager() { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(ResourceNotFoundException.class, t -> t); From 85a5423657ef90516b798dda79b3bfc125ccbe7b Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 14 Jul 2020 15:02:04 -0700 Subject: [PATCH 3/3] Addressing more comments --- .../kinesis/retrieval/DataRetrievalUtil.java | 15 +++++++++++++++ .../retrieval/fanout/FanOutRecordsPublisher.java | 4 ++-- .../retrieval/polling/KinesisDataFetcher.java | 4 +++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java index 35ad2de6..ba743e61 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.retrieval; import software.amazon.awssdk.services.kinesis.model.ChildShard; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index b6299bbf..7e8932cf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -43,7 +43,6 @@ import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; -import software.amazon.kinesis.retrieval.DataRetrievalUtil; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @@ -62,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; +import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; @Slf4j @KinesisClientInternalApi @@ -486,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Since the triggeringFlow is active flow, it will then trigger the handleFlowError call. // Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber. // The ShardConsumerSubscriber will finally cancel the subscription. - if (!DataRetrievalUtil.isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) { + if (!isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) { throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid." + " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber() + ". event.childShards: " + recordBatchEvent.childShards()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 3584dbf8..223ab367 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -54,6 +54,8 @@ import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; + /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ @@ -291,7 +293,7 @@ public class KinesisDataFetcher implements DataFetcher { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); - if (!DataRetrievalUtil.isValidResult(response.nextShardIterator(), response.childShards())) { + if (!isValidResult(response.nextShardIterator(), response.childShards())) { throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + ". nextShardIterator: " + response.nextShardIterator() + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");