Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

This commit is contained in:
Chenyuan Lee 2022-12-10 01:03:18 +08:00
parent 05ed537572
commit ddf6987fdc
5 changed files with 67 additions and 7 deletions

View file

@ -42,10 +42,25 @@ public class IteratorBuilder {
}
public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder,
String sequenceNumber, InitialPositionInStreamExtended initialPosition) {
String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AT_SEQUENCE_NUMBER);
}
public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
private static GetShardIteratorRequest.Builder getShardIteratorRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition,
ShardIteratorType shardIteratorType) {
return apply(builder, GetShardIteratorRequest.Builder::shardIteratorType, GetShardIteratorRequest.Builder::timestamp,
GetShardIteratorRequest.Builder::startingSequenceNumber, initialPosition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
shardIteratorType);
}
private final static Map<String, ShardIteratorType> SHARD_ITERATOR_MAPPING;

View file

@ -223,6 +223,12 @@ public class KinesisDataFetcher implements DataFetcher {
@Override
public void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream) {
advanceIteratorTo(sequenceNumber, initialPositionInStream, false);
}
private void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream,
boolean isIteratorRestart) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
}
@ -231,8 +237,10 @@ public class KinesisDataFetcher implements DataFetcher {
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream)
.build();
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build();
if (isIteratorRestart) {
request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build();
}
// TODO: Check if this metric is fine to be added
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -270,8 +278,8 @@ public class KinesisDataFetcher implements DataFetcher {
}
/**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
* Gets a new next shard iterator from last known sequence number i.e. the sequence number of the last
* record from the last records call.
*/
@Override
public void restartIterator() {
@ -279,7 +287,9 @@ public class KinesisDataFetcher implements DataFetcher {
throw new IllegalStateException(
"Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
}
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
log.debug("Getting a new next shard iterator for sequence number {} " +
"for streamAndShardId {}", lastKnownSequenceNumber, streamAndShardId);
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true);
}
@Override

View file

@ -502,6 +502,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
log.debug("Last sequence number retrieved for streamAndShardId {} is {}", streamAndShardId,
recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
} catch (PositionResetException pse) {

View file

@ -62,6 +62,12 @@ public class IteratorBuilderTest {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::request, WrappedRequest::wrapped);
}
@Test
public void getShardIteratorReconnectTest() {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::reconnectRequest, WrappedRequest::wrapped,
ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}
@Test
public void subscribeTimestampTest() {
timeStampTest(this::stsBase, this::verifyStsBase, IteratorBuilder::request, WrappedRequest::wrapped);

View file

@ -435,6 +435,33 @@ public class KinesisDataFetcherTest {
assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept());
}
@Test
public void testRestartIteratorUsesAfterSequenceNumberIteratorType() throws Exception {
final String iterator = "iterator";
final String sequenceNumber = "123";
final ArgumentCaptor<GetShardIteratorRequest> shardIteratorRequestCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
thenReturn(makeGetShardIteratorResonse(iterator));
kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
// The advanceIteratorTo call should not use AFTER_SEQUENCE_NUMBER iterator
// type unless called by restartIterator
kinesisDataFetcher.advanceIteratorTo(sequenceNumber, INITIAL_POSITION_LATEST);
final List<GetShardIteratorRequest> shardIteratorRequests = shardIteratorRequestCaptor.getAllValues();
assertEquals(3, shardIteratorRequests.size());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(0).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(1).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(2).shardIteratorTypeAsString());
}
@Test(expected = IllegalStateException.class)
public void testRestartIteratorNotInitialized() {
kinesisDataFetcher.restartIterator();