From 5351c9ddabf55a231c0bdd1dfdf4c1e1b43247bc Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 25 Mar 2020 15:49:57 -0400 Subject: [PATCH] Adding hash range validation for list shards with filter. --- .../leases/HierarchicalShardSyncer.java | 77 ++++++++++++++++-- .../leases/HierarchicalShardSyncerTest.java | 79 ++++++++++++++++++- 2 files changed, 149 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index ecd64952..d398fb34 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -68,6 +68,10 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; + public static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); + public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); + public static final int retriesForCompleteHashRange = 3; + private String streamIdentifier = ""; public HierarchicalShardSyncer() { @@ -340,12 +344,27 @@ public class HierarchicalShardSyncer { } private static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, - InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { - final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); - final Optional> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter)); + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { - return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + - " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); + final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); + List shards; + + for (int i = 0; i < retriesForCompleteHashRange; i++) { + shards = shardDetector.listShardsWithFilter(shardFilter); + + if (shards == null) { + throw new KinesisClientLibIOException( + "Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + } + + if (hashRangeOfShardsIsComplete(shards)) { + return shards; + } + } + + throw new KinesisClientLibIOException("Hash range of shards returned was incomplete after " + + retriesForCompleteHashRange + " retries."); } private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { @@ -355,6 +374,30 @@ public class HierarchicalShardSyncer { " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } + private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + + final Comparator shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator(); + shards.sort(shardStartingHashKeyBasedComparator); + + if (!shards.get(0).hashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || + !shards.get(shards.size() - 1).hashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { + return false; + } + + if (shards.size() > 1) { + for (int i = 1; i < shards.size(); i++) { + final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey()); + final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey()); + + if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + return false; + } + } + } + + return true; + } + /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. @@ -414,7 +457,7 @@ public class HierarchicalShardSyncer { * Check if this shard is a descendant of a shard that is (or will be) processed. * Create leases for the ancestors of this shard as required. * See javadoc of determineNewLeasesToCreate() for rules and example. - * + * * @param shardId The shardId to check. * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * location in the shard (when an application starts up for the first time - and there are no checkpoints). @@ -789,6 +832,28 @@ public class HierarchicalShardSyncer { .map(streamId -> streamId.serialize()).orElse("single_stream_mode"); } + /** + * Helper class to compare shards based on their hash range. + */ + @RequiredArgsConstructor + private static class ShardStartingHashKeyBasedComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + /** + * Compares two shards based on their starting hash keys. + * We assume that the shards provided are non-null. + * + * {@inheritDoc} + */ + @Override + public int compare(Shard shard1, Shard shard2) { + BigInteger hashKey1 = new BigInteger(shard1.hashKeyRange().startingHashKey()); + BigInteger hashKey2 = new BigInteger(shard2.hashKeyRange().startingHashKey()); + + return hashKey1.compareTo(hashKey2); + } + } + /** Helper class to compare leases based on starting sequence number of the corresponding shards. * */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 30e4f081..963f84fd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1683,7 +1683,84 @@ public class HierarchicalShardSyncerTest { verify(shardDetector, atLeast(1)).listShards(); } -// /**getShardFilterFromInitialPosition + /** + * Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three + * retries, we fast fail and throw an exception. + * @throws Exception + */ + @Test(expected = KinesisClientLibIOException.class) + public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception { + final List shardsWithIncompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange); + + try { + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + } finally { + verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + } + } + + /** + * Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times + * and a complete hash range the final time, we create the leases. + * @throws Exception + */ + @Test + public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception { + final List shardsWithIncompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ); + final List shardsWithCompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), + ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange) + .thenReturn(shardsWithIncompleteHashRange).thenReturn(shardsWithCompleteHashRange); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class)); + } + + /** + * Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards. + * @throws Exception + */ + @Test + public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception { + final List shardsWithCompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), + ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithCompleteHashRange); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class)); + } + +// /** // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // */ // @Test