Adding hash range validation for list shards with filter.

This commit is contained in:
Joshua Kim 2020-03-25 15:49:57 -04:00
parent 33cd0f52b2
commit 5351c9ddab
2 changed files with 149 additions and 7 deletions

View file

@ -68,6 +68,10 @@ public class HierarchicalShardSyncer {
private final boolean isMultiStreamMode;
public static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
public static final int retriesForCompleteHashRange = 3;
private String streamIdentifier = "";
public HierarchicalShardSyncer() {
@ -340,12 +344,27 @@ public class HierarchicalShardSyncer {
}
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
List<Shard> shards;
for (int i = 0; i < retriesForCompleteHashRange; i++) {
shards = shardDetector.listShardsWithFilter(shardFilter);
if (shards == null) {
throw new KinesisClientLibIOException(
"Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
}
if (hashRangeOfShardsIsComplete(shards)) {
return shards;
}
}
throw new KinesisClientLibIOException("Hash range of shards returned was incomplete after "
+ retriesForCompleteHashRange + " retries.");
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
@ -355,6 +374,30 @@ public class HierarchicalShardSyncer {
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
}
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) {
final Comparator<Shard> shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator();
shards.sort(shardStartingHashKeyBasedComparator);
if (!shards.get(0).hashKeyRange().startingHashKey().equals(MIN_HASH_KEY) ||
!shards.get(shards.size() - 1).hashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) {
return false;
}
if (shards.size() > 1) {
for (int i = 1; i < shards.size(); i++) {
final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey());
final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey());
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
return false;
}
}
}
return true;
}
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
@ -789,6 +832,28 @@ public class HierarchicalShardSyncer {
.map(streamId -> streamId.serialize()).orElse("single_stream_mode");
}
/**
* Helper class to compare shards based on their hash range.
*/
@RequiredArgsConstructor
private static class ShardStartingHashKeyBasedComparator implements Comparator<Shard>, Serializable {
private static final long serialVersionUID = 1L;
/**
* Compares two shards based on their starting hash keys.
* We assume that the shards provided are non-null.
*
* {@inheritDoc}
*/
@Override
public int compare(Shard shard1, Shard shard2) {
BigInteger hashKey1 = new BigInteger(shard1.hashKeyRange().startingHashKey());
BigInteger hashKey2 = new BigInteger(shard2.hashKeyRange().startingHashKey());
return hashKey1.compareTo(hashKey2);
}
}
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
*
*/

View file

@ -1683,7 +1683,84 @@ public class HierarchicalShardSyncerTest {
verify(shardDetector, atLeast(1)).listShards();
}
// /**getShardFilterFromInitialPosition
/**
* Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three
* retries, we fast fail and throw an exception.
* @throws Exception
*/
@Test(expected = KinesisClientLibIOException.class)
public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception {
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3"))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange);
try {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
}
}
/**
* Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times
* and a complete hash range the final time, we create the leases.
* @throws Exception
*/
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")),
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3"))
);
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange)
.thenReturn(shardsWithIncompleteHashRange).thenReturn(shardsWithCompleteHashRange);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class));
}
/**
* Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards.
* @throws Exception
*/
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithCompleteHashRange);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class));
}
// /**
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
// */
// @Test