From c76d4487392bb4f3224100f8329e08f5026afe05 Mon Sep 17 00:00:00 2001 From: Chen Yuan Lee Date: Tue, 3 Jan 2023 11:14:44 -0800 Subject: [PATCH] Update Java docs and minor refactoring --- .../kinesis/retrieval/IteratorBuilder.java | 16 ++++++++++++++++ .../retrieval/polling/KinesisDataFetcher.java | 8 +++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java index 8f1e75ac..62ddb140 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java @@ -41,6 +41,14 @@ public class IteratorBuilder { ShardIteratorType.AFTER_SEQUENCE_NUMBER); } + /** + * Creates a GetShardIteratorRequest builder that uses AT_SEQUENCE_NUMBER GetShardIterator. + * + * @param builder An initial GetShardIteratorRequest builder to be updated. + * @param sequenceNumber The sequence number to restart the request from. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. + * @return An updated GetShardIteratorRequest.Builder + */ public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder, String sequenceNumber, InitialPositionInStreamExtended initialPosition) { @@ -48,6 +56,14 @@ public class IteratorBuilder { } + /** + * Creates a GetShardIteratorRequest builder that uses AFTER_SEQUENCE_NUMBER GetShardIterator. + * + * @param builder An initial GetShardIteratorRequest builder to be updated. + * @param sequenceNumber The sequence number to restart the request from. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. + * @return An updated GetShardIteratorRequest.Builder + */ public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder, String sequenceNumber, InitialPositionInStreamExtended initialPosition) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index e03da503..8d36ea8a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -237,9 +237,11 @@ public class KinesisDataFetcher implements DataFetcher { GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() .streamName(streamIdentifier.streamName()).shardId(shardId); - GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build(); + GetShardIteratorRequest request; if (isIteratorRestart) { request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build(); + } else { + request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build(); } // TODO: Check if this metric is fine to be added @@ -287,8 +289,8 @@ public class KinesisDataFetcher implements DataFetcher { throw new IllegalStateException( "Make sure to initialize the KinesisDataFetcher before restarting the iterator."); } - log.debug("Getting a new next shard iterator for sequence number {} " + - "for streamAndShardId {}", lastKnownSequenceNumber, streamAndShardId); + log.debug("Restarting iterator for sequence number {} on shard id {}", + lastKnownSequenceNumber, streamAndShardId); advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true); }