Update Java docs and minor refactoring

This commit is contained in:
Chen Yuan Lee 2023-01-03 11:14:44 -08:00
parent ddf6987fdc
commit c76d448739
2 changed files with 21 additions and 3 deletions

View file

@ -41,6 +41,14 @@ public class IteratorBuilder {
ShardIteratorType.AFTER_SEQUENCE_NUMBER); ShardIteratorType.AFTER_SEQUENCE_NUMBER);
} }
/**
* Creates a GetShardIteratorRequest builder that uses AT_SEQUENCE_NUMBER GetShardIterator.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder
*/
public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder, public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder,
String sequenceNumber, String sequenceNumber,
InitialPositionInStreamExtended initialPosition) { InitialPositionInStreamExtended initialPosition) {
@ -48,6 +56,14 @@ public class IteratorBuilder {
} }
/**
* Creates a GetShardIteratorRequest builder that uses AFTER_SEQUENCE_NUMBER GetShardIterator.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder
*/
public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder, public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber, String sequenceNumber,
InitialPositionInStreamExtended initialPosition) { InitialPositionInStreamExtended initialPosition) {

View file

@ -237,9 +237,11 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId); .streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build(); GetShardIteratorRequest request;
if (isIteratorRestart) { if (isIteratorRestart) {
request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build(); request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build();
} else {
request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build();
} }
// TODO: Check if this metric is fine to be added // TODO: Check if this metric is fine to be added
@ -287,8 +289,8 @@ public class KinesisDataFetcher implements DataFetcher {
throw new IllegalStateException( throw new IllegalStateException(
"Make sure to initialize the KinesisDataFetcher before restarting the iterator."); "Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
} }
log.debug("Getting a new next shard iterator for sequence number {} " + log.debug("Restarting iterator for sequence number {} on shard id {}",
"for streamAndShardId {}", lastKnownSequenceNumber, streamAndShardId); lastKnownSequenceNumber, streamAndShardId);
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true); advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true);
} }