Addressing more comments

This commit is contained in:
Chunxue Yang 2020-07-14 15:02:04 -07:00
parent a0094b0df8
commit 85a5423657
3 changed files with 20 additions and 3 deletions

View file

@ -1,3 +1,18 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import software.amazon.awssdk.services.kinesis.model.ChildShard;

View file

@ -43,7 +43,6 @@ import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
@ -62,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
@Slf4j
@KinesisClientInternalApi
@ -486,7 +486,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
// Since the triggeringFlow is active flow, it will then trigger the handleFlowError call.
// Since the exception is not ResourceNotFoundException, it will trigger onError in the ShardConsumerSubscriber.
// The ShardConsumerSubscriber will finally cancel the subscription.
if (!DataRetrievalUtil.isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) {
if (!isValidResult(recordBatchEvent.continuationSequenceNumber(), recordBatchEvent.childShards())) {
throw new InvalidStateException("RecordBatchEvent for flow " + triggeringFlow.toString() + " is invalid."
+ " event.continuationSequenceNumber: " + recordBatchEvent.continuationSequenceNumber()
+ ". event.childShards: " + recordBatchEvent.childShards());

View file

@ -54,6 +54,8 @@ import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.retrieval.DataRetrievalUtil.isValidResult;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
@ -291,7 +293,7 @@ public class KinesisDataFetcher implements DataFetcher {
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request),
maxFutureWait);
if (!DataRetrievalUtil.isValidResult(response.nextShardIterator(), response.childShards())) {
if (!isValidResult(response.nextShardIterator(), response.childShards())) {
throw new RetryableRetrievalException("GetRecords response is not valid for shard: " + streamAndShardId
+ ". nextShardIterator: " + response.nextShardIterator()
+ ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator.");