diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java index 35ad2de6..ba743e61 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataRetrievalUtil.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package software.amazon.kinesis.retrieval; import software.amazon.awssdk.services.kinesis.model.ChildShard; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index b6299bbf..7e8932cf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -43,7 +43,6 @@ import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; -import software.amazon.kinesis.retrieval.DataRetrievalUtil; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @@ -62,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; +import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; @Slf4j @KinesisClientInternalApi @@ -486,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { // Since the triggeringFlow is active flow, it will then trigger the handleFlowError call. // Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber. // The ShardConsumerSubscriber will finally cancel the subscription. - if (!DataRetrievalUtil.isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) { + if (!isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) { throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid." + " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber() + ". event.childShards: " + recordBatchEvent.childShards()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 3584dbf8..223ab367 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -54,6 +54,8 @@ import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult; + /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ @@ -291,7 +293,7 @@ public class KinesisDataFetcher implements DataFetcher { public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); - if (!DataRetrievalUtil.isValidResult(response.nextShardIterator(), response.childShards())) { + if (!isValidResult(response.nextShardIterator(), response.childShards())) { throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId + ". nextShardIterator: " + response.nextShardIterator() + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");