Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request (#1014)

* Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

* Update Java docs and minor refactoring

* Fix Java doc

Co-authored-by: Chenyuan Lee <chenylee@amazon.com>
This commit is contained in:
Chenyuan Lee 2023-01-03 15:10:31 -08:00 committed by GitHub
parent 676bb86b8e
commit 17d0940f5d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 7 deletions

View file

@ -41,11 +41,42 @@ public class IteratorBuilder {
ShardIteratorType.AFTER_SEQUENCE_NUMBER); ShardIteratorType.AFTER_SEQUENCE_NUMBER);
} }
/**
* Creates a GetShardIteratorRequest builder that uses AT_SEQUENCE_NUMBER ShardIteratorType.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder.
*/
public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder, public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder,
String sequenceNumber, InitialPositionInStreamExtended initialPosition) { String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AT_SEQUENCE_NUMBER);
}
/**
* Creates a GetShardIteratorRequest builder that uses AFTER_SEQUENCE_NUMBER ShardIteratorType.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder.
*/
public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
private static GetShardIteratorRequest.Builder getShardIteratorRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition,
ShardIteratorType shardIteratorType) {
return apply(builder, GetShardIteratorRequest.Builder::shardIteratorType, GetShardIteratorRequest.Builder::timestamp, return apply(builder, GetShardIteratorRequest.Builder::shardIteratorType, GetShardIteratorRequest.Builder::timestamp,
GetShardIteratorRequest.Builder::startingSequenceNumber, initialPosition, sequenceNumber, GetShardIteratorRequest.Builder::startingSequenceNumber, initialPosition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER); shardIteratorType);
} }
private final static Map<String, ShardIteratorType> SHARD_ITERATOR_MAPPING; private final static Map<String, ShardIteratorType> SHARD_ITERATOR_MAPPING;

View file

@ -223,6 +223,12 @@ public class KinesisDataFetcher implements DataFetcher {
@Override @Override
public void advanceIteratorTo(final String sequenceNumber, public void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream) { final InitialPositionInStreamExtended initialPositionInStream) {
advanceIteratorTo(sequenceNumber, initialPositionInStream, false);
}
private void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream,
boolean isIteratorRestart) {
if (sequenceNumber == null) { if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
} }
@ -231,8 +237,12 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId); .streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream) GetShardIteratorRequest request;
.build(); if (isIteratorRestart) {
request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build();
} else {
request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build();
}
// TODO: Check if this metric is fine to be added // TODO: Check if this metric is fine to be added
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -270,8 +280,8 @@ public class KinesisDataFetcher implements DataFetcher {
} }
/** /**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last * Gets a new next shard iterator from last known sequence number i.e. the sequence number of the last
* records call. * record from the last records call.
*/ */
@Override @Override
public void restartIterator() { public void restartIterator() {
@ -279,7 +289,9 @@ public class KinesisDataFetcher implements DataFetcher {
throw new IllegalStateException( throw new IllegalStateException(
"Make sure to initialize the KinesisDataFetcher before restarting the iterator."); "Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
} }
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream); log.debug("Restarting iterator for sequence number {} on shard id {}",
lastKnownSequenceNumber, streamAndShardId);
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true);
} }
@Override @Override

View file

@ -502,6 +502,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(), calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
log.debug("Last sequence number retrieved for streamAndShardId {} is {}", streamAndShardId,
recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved); addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests(); drainQueueForRequests();
} catch (PositionResetException pse) { } catch (PositionResetException pse) {

View file

@ -62,6 +62,12 @@ public class IteratorBuilderTest {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::request, WrappedRequest::wrapped); sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::request, WrappedRequest::wrapped);
} }
@Test
public void getShardIteratorReconnectTest() {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::reconnectRequest, WrappedRequest::wrapped,
ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
@Test @Test
public void subscribeTimestampTest() { public void subscribeTimestampTest() {
timeStampTest(this::stsBase, this::verifyStsBase, IteratorBuilder::request, WrappedRequest::wrapped); timeStampTest(this::stsBase, this::verifyStsBase, IteratorBuilder::request, WrappedRequest::wrapped);

View file

@ -435,6 +435,33 @@ public class KinesisDataFetcherTest {
assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept()); assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept());
} }
@Test
public void testRestartIteratorUsesAfterSequenceNumberIteratorType() throws Exception {
final String iterator = "iterator";
final String sequenceNumber = "123";
final ArgumentCaptor<GetShardIteratorRequest> shardIteratorRequestCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
thenReturn(makeGetShardIteratorResonse(iterator));
kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
// The advanceIteratorTo call should not use AFTER_SEQUENCE_NUMBER iterator
// type unless called by restartIterator
kinesisDataFetcher.advanceIteratorTo(sequenceNumber, INITIAL_POSITION_LATEST);
final List<GetShardIteratorRequest> shardIteratorRequests = shardIteratorRequestCaptor.getAllValues();
assertEquals(3, shardIteratorRequests.size());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(0).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(1).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(2).shardIteratorTypeAsString());
}
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testRestartIteratorNotInitialized() { public void testRestartIteratorNotInitialized() {
kinesisDataFetcher.restartIterator(); kinesisDataFetcher.restartIterator();