Fix preparing a checkpoint at SHARD_END (#301)

Fix IllegalArgumentException: Sequence number must be numeric, when preparing a checkpoint at SHARD_END
This commit is contained in:
Walid Baruni 2018-02-27 08:49:20 -08:00 committed by Justin Pfifer
parent 24916ba552
commit 523cc0e2cc
3 changed files with 44 additions and 3 deletions

View file

@ -70,12 +70,14 @@ public class SequenceNumberValidator {
*/ */
void validateSequenceNumber(String sequenceNumber) void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException { throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
if (!isDigits(sequenceNumber)) { boolean atShardEnd = ExtendedSequenceNumber.SHARD_END.getSequenceNumber().equals(sequenceNumber);
if (!atShardEnd && !isDigits(sequenceNumber)) {
LOG.info("Sequence number must be numeric, but was " + sequenceNumber); LOG.info("Sequence number must be numeric, but was " + sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber); throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
} }
try { try {
if (validateWithGetIterator) { if (!atShardEnd &&validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber); proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId); LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId);
} }

View file

@ -168,6 +168,21 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId)); Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
} }
/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
processingCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
}
/** /**
* Test method for * Test method for
* {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}. * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer#prepareCheckpoint()}.
@ -299,6 +314,30 @@ public class RecordProcessorCheckpointerTest {
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint()); Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
} }
/**
* Test method for {@link RecordProcessorCheckpointer#checkpoint(String SHARD_END)}.
*/
@Test
public final void testPrepareCheckpointAtShardEnd() throws Exception {
RecordProcessorCheckpointer processingCheckpointer =
new RecordProcessorCheckpointer(shardInfo, checkpoint, sequenceNumberValidator, metricsFactory);
processingCheckpointer.setInitialCheckpointValue(startingExtendedSequenceNumber);
ExtendedSequenceNumber extendedSequenceNumber = ExtendedSequenceNumber.SHARD_END;
processingCheckpointer.setLargestPermittedCheckpointValue(extendedSequenceNumber);
IPreparedCheckpointer preparedCheckpoint = processingCheckpointer.prepareCheckpoint(ExtendedSequenceNumber.SHARD_END.getSequenceNumber());
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(startingExtendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(extendedSequenceNumber, preparedCheckpoint.getPendingCheckpoint());
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
// Checkpoint using preparedCheckpoint
preparedCheckpoint.checkpoint();
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpoint(shardId));
Assert.assertEquals(extendedSequenceNumber, checkpoint.getCheckpointObject(shardId).getCheckpoint());
Assert.assertEquals(null, checkpoint.getCheckpointObject(shardId).getPendingCheckpoint());
}
/** /**
* Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order. * Test that having multiple outstanding prepared checkpointers works if they are redeemed in the right order.
*/ */

View file

@ -87,7 +87,7 @@ public class SequenceNumberValidatorTest {
boolean validateWithGetIterator) { boolean validateWithGetIterator) {
String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(), String[] nonNumericStrings = { null, "bogus-sequence-number", SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.SHARD_END.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(), SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.AT_TIMESTAMP.toString() }; SentinelCheckpoint.AT_TIMESTAMP.toString() };
for (String nonNumericString : nonNumericStrings) { for (String nonNumericString : nonNumericStrings) {