Added a sequence number validator to ensure safer checkpoints (#432)

Added the SequenceNumberValidator that will be used in the checkpoint
process to ensure that the sequence number is valid for the shard
being checkpointed.
This commit is contained in:
Justin Pfifer 2018-10-10 13:02:15 -07:00 committed by GitHub
parent 2609e1ce46
commit f2fb9ead0d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 249 additions and 86 deletions

View file

@ -0,0 +1,188 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.checkpoint;
import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* This supports extracting the shardId from a sequence number.
*
* <h2>Warning</h2>
* <strong>Sequence numbers are an opaque value used by Kinesis, and maybe changed at any time. Should validation stop
* working you may need to update your version of the KCL</strong>
*
*/
public class SequenceNumberValidator {
@Data
@Accessors(fluent = true)
private static class SequenceNumberComponents {
final int version;
final int shardId;
}
private interface SequenceNumberReader {
Optional<SequenceNumberComponents> read(String sequenceNumber);
}
/**
* Reader for the v2 sequence number format. v1 sequence numbers are no longer used or available.
*/
private static class V2SequenceNumberReader implements SequenceNumberReader {
private static final int VERSION = 2;
private static final int EXPECTED_BIT_LENGTH = 186;
private static final int VERSION_OFFSET = 184;
private static final long VERSION_MASK = (1 << 4) - 1;
private static final int SHARD_ID_OFFSET = 4;
private static final long SHARD_ID_MASK = (1L << 32) - 1;
@Override
public Optional<SequenceNumberComponents> read(String sequenceNumberString) {
BigInteger sequenceNumber = new BigInteger(sequenceNumberString, 10);
//
// If the bit length of the sequence number isn't 186 it's impossible for the version numbers
// to be where we expect them. We treat this the same as an unknown version of the sequence number
//
// If the sequence number length isn't what we expect it's due to a new version of the sequence number or
// an invalid sequence number. This
//
if (sequenceNumber.bitLength() != EXPECTED_BIT_LENGTH) {
return Optional.empty();
}
//
// Read the 4 most significant bits of the sequence number, the 2 most significant bits are implicitly 0
// (2 == 0b0011). If the version number doesn't match we give up and say we can't parse the sequence number
//
int version = readOffset(sequenceNumber, VERSION_OFFSET, VERSION_MASK);
if (version != VERSION) {
return Optional.empty();
}
//
// If we get here the sequence number is big enough, and the version matches so the shardId should be valid.
//
int shardId = readOffset(sequenceNumber, SHARD_ID_OFFSET, SHARD_ID_MASK);
return Optional.of(new SequenceNumberComponents(version, shardId));
}
private int readOffset(BigInteger sequenceNumber, int offset, long mask) {
long value = sequenceNumber.shiftRight(offset).longValue() & mask;
return (int) value;
}
}
private static final List<SequenceNumberReader> SEQUENCE_NUMBER_READERS = Collections
.singletonList(new V2SequenceNumberReader());
private Optional<SequenceNumberComponents> retrieveComponentsFor(String sequenceNumber) {
return SEQUENCE_NUMBER_READERS.stream().map(r -> r.read(sequenceNumber)).filter(Optional::isPresent).map(Optional::get).findFirst();
}
/**
* Attempts to retrieve the version for a sequence number. If no reader can be found for the sequence number this
* will return an empty Optional.
*
* <p>
* <strong>This will return an empty Optional if the it's unable to extract the version number. This can occur for
* multiple reasons including:
* <ul>
* <li>Kinesis has started using a new version of sequence numbers</li>
* <li>The provided sequence number isn't a valid Kinesis sequence number.</li>
* </ul>
* </strong>
* </p>
*
* @param sequenceNumber
* the sequence number to extract the version from
* @return an Optional containing the version if a compatible sequence number reader can be found, an empty Optional
* otherwise.
*/
public Optional<Integer> versionFor(String sequenceNumber) {
return retrieveComponentsFor(sequenceNumber).map(SequenceNumberComponents::version);
}
/**
* Attempts to retrieve the shardId from a sequence number. If the version of the sequence number is unsupported
* this will return an empty optional.
*
* <strong>This will return an empty Optional if the sequence number isn't recognized. This can occur for multiple
* reasons including:
* <ul>
* <li>Kinesis has started using a new version of sequence numbers</li>
* <li>The provided sequence number isn't a valid Kinesis sequence number.</li>
* </ul>
* </strong>
* <p>
* This should always return a value if {@link #versionFor(String)} returns a value
* </p>
*
* @param sequenceNumber
* the sequence number to extract the shardId from
* @return an Optional containing the shardId if the version is supported, an empty Optional otherwise.
*/
public Optional<String> shardIdFor(String sequenceNumber) {
return retrieveComponentsFor(sequenceNumber).map(s -> String.format("shardId-%012d", s.shardId()));
}
/**
* Validates that the sequence number provided contains the given shardId. If the sequence number is unsupported
* this will return an empty Optional.
*
* <p>
* Validation of a sequence number will only occur if the sequence number can be parsed. It's possible to use
* {@link #versionFor(String)} to verify that the given sequence number is supported by this class. There are 3
* possible validation states:
* <dl>
* <dt>Some(True)</dt>
* <dd>The sequence number can be parsed, and the shardId matches the one in the sequence number</dd>
* <dt>Some(False)</dt>
* <dd>THe sequence number can be parsed, and the shardId doesn't match the one in the sequence number</dd>
* <dt>None</dt>
* <dd>It wasn't possible to parse the sequence number so the validity of the sequence number is unknown</dd>
* </dl>
* </p>
*
* <p>
* <strong>Handling unknown validation causes is application specific, and not specific handling is
* provided.</strong>
* </p>
*
* @param sequenceNumber
* the sequence number to verify the shardId
* @param shardId
* the shardId that the sequence is expected to contain
* @return true if the sequence number contains the shardId, false if it doesn't. If the sequence number version is
* unsupported this will return an empty Optional
*/
public Optional<Boolean> validateSequenceNumberForShard(String sequenceNumber, String shardId) {
return shardIdFor(sequenceNumber).map(s -> StringUtils.equalsIgnoreCase(s, shardId));
}
}

View file

@ -14,113 +14,88 @@
*/ */
package software.amazon.kinesis.checkpoint; package software.amazon.kinesis.checkpoint;
//@RunWith(MockitoJUnitRunner.class) import org.junit.Before;
import org.junit.Test;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
public class SequenceNumberValidatorTest { public class SequenceNumberValidatorTest {
/*private final String streamName = "testStream";
private final boolean validateWithGetIterator = true;
private final String shardId = "shardid-123";
@Mock private SequenceNumberValidator validator;
private AmazonKinesis amazonKinesis;
@Test (expected = IllegalArgumentException.class) @Before
public final void testSequenceNumberValidator() { public void begin() {
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName, validator = new SequenceNumberValidator();
shardId, validateWithGetIterator);
String goodSequence = "456";
String iterator = "happyiterator";
String badSequence = "789";
ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
when(amazonKinesis.getShardIterator(requestCaptor.capture()))
.thenReturn(new GetShardIteratorResult().withShardIterator(iterator))
.thenThrow(new InvalidArgumentException(""));
validator.validateSequenceNumber(goodSequence);
try {
validator.validateSequenceNumber(badSequence);
} finally {
final List<GetShardIteratorRequest> requests = requestCaptor.getAllValues();
assertEquals(2, requests.size());
final GetShardIteratorRequest goodRequest = requests.get(0);
final GetShardIteratorRequest badRequest = requests.get(0);
assertEquals(streamName, goodRequest.getStreamName());
assertEquals(shardId, goodRequest.shardId());
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), goodRequest.getShardIteratorType());
assertEquals(goodSequence, goodRequest.getStartingSequenceNumber());
assertEquals(streamName, badRequest.getStreamName());
assertEquals(shardId, badRequest.shardId());
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), badRequest.getShardIteratorType());
assertEquals(goodSequence, badRequest.getStartingSequenceNumber());
} }
@Test
public void matchingSequenceNumberTest() {
String sequenceNumber = "49587497311274533994574834252742144236107130636007899138";
String expectedShardId = "shardId-000000000000";
Optional<Integer> version = validator.versionFor(sequenceNumber);
assertThat(version, equalTo(Optional.of(2)));
Optional<String> shardId = validator.shardIdFor(sequenceNumber);
assertThat(shardId, equalTo(Optional.of(expectedShardId)));
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.of(true)));
} }
@Test @Test
public final void testNoValidation() { public void shardMismatchTest() {
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName, String sequenceNumber = "49585389983312162443796657944872008114154899568972529698";
shardId, !validateWithGetIterator); String invalidShardId = "shardId-000000000001";
String sequenceNumber = "456";
// Just checking that the false flag for validating against getIterator is honored Optional<Integer> version = validator.versionFor(sequenceNumber);
validator.validateSequenceNumber(sequenceNumber); assertThat(version, equalTo(Optional.of(2)));
verify(amazonKinesis, never()).getShardIterator(any(GetShardIteratorRequest.class)); Optional<String> shardId = validator.shardIdFor(sequenceNumber);
assertThat(shardId, not(equalTo(invalidShardId)));
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, invalidShardId), equalTo(Optional.of(false)));
} }
@Test @Test
public void nonNumericValueValidationTest() { public void versionMismatchTest() {
Checkpoint.SequenceNumberValidator validator = new Checkpoint.SequenceNumberValidator(amazonKinesis, streamName, String sequenceNumber = "74107425965128755728308386687147091174006956590945533954";
shardId, validateWithGetIterator); String expectedShardId = "shardId-000000000000";
String[] nonNumericStrings = {null, Optional<Integer> version = validator.versionFor(sequenceNumber);
"bogus-sequence-number", assertThat(version, equalTo(Optional.empty()));
SentinelCheckpoint.LATEST.toString(),
SentinelCheckpoint.TRIM_HORIZON.toString(),
SentinelCheckpoint.AT_TIMESTAMP.toString()};
Arrays.stream(nonNumericStrings).forEach(sequenceNumber -> { Optional<String> shardId = validator.shardIdFor(sequenceNumber);
try { assertThat(shardId, equalTo(Optional.empty()));
validator.validateSequenceNumber(sequenceNumber);
fail("Validator should not consider " + sequenceNumber + " a valid sequence number");
} catch (IllegalArgumentException e) {
// Do nothing
}
});
verify(amazonKinesis, never()).getShardIterator(any(GetShardIteratorRequest.class)); assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
} }
@Test @Test
public final void testIsDigits() { public void sequenceNumberToShortTest() {
// Check things that are all digits String sequenceNumber = "4958538998331216244379665794487200811415489956897252969";
String[] stringsOfDigits = {"0", "12", "07897803434", "12324456576788"}; String expectedShardId = "shardId-000000000000";
for (String digits : stringsOfDigits) { assertThat(validator.versionFor(sequenceNumber), equalTo(Optional.empty()));
assertTrue("Expected that " + digits + " would be considered a string of digits.", assertThat(validator.shardIdFor(sequenceNumber), equalTo(Optional.empty()));
Checkpoint.SequenceNumberValidator.isDigits(digits));
assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
} }
// Check things that are not all digits
String[] stringsWithNonDigits = { @Test
null, public void sequenceNumberToLongTest() {
"", String sequenceNumber = "495874973112745339945748342527421442361071306360078991381";
" ", // white spaces String expectedShardId = "shardId-000000000000";
"6 4",
"\t45", assertThat(validator.versionFor(sequenceNumber), equalTo(Optional.empty()));
"5242354235234\n", assertThat(validator.shardIdFor(sequenceNumber), equalTo(Optional.empty()));
"7\n6\n5\n",
"12s", // last character assertThat(validator.validateSequenceNumberForShard(sequenceNumber, expectedShardId), equalTo(Optional.empty()));
"c07897803434", // first character
"1232445wef6576788", // interior
"no-digits",
};
for (String notAllDigits : stringsWithNonDigits) {
assertFalse("Expected that " + notAllDigits + " would not be considered a string of digits.",
Checkpoint.SequenceNumberValidator.isDigits(notAllDigits));
} }
}*/
} }