From 5351c9ddabf55a231c0bdd1dfdf4c1e1b43247bc Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 25 Mar 2020 15:49:57 -0400 Subject: [PATCH 1/4] Adding hash range validation for list shards with filter. --- .../leases/HierarchicalShardSyncer.java | 77 ++++++++++++++++-- .../leases/HierarchicalShardSyncerTest.java | 79 ++++++++++++++++++- 2 files changed, 149 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index ecd64952..d398fb34 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -68,6 +68,10 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; + public static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); + public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); + public static final int retriesForCompleteHashRange = 3; + private String streamIdentifier = ""; public HierarchicalShardSyncer() { @@ -340,12 +344,27 @@ public class HierarchicalShardSyncer { } private static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, - InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { - final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); - final Optional> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter)); + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { - return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + - " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); + final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); + List shards; + + for (int i = 0; i < retriesForCompleteHashRange; i++) { + shards = shardDetector.listShardsWithFilter(shardFilter); + + if (shards == null) { + throw new KinesisClientLibIOException( + "Stream " + shardDetector.streamIdentifier().streamName() + + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + } + + if (hashRangeOfShardsIsComplete(shards)) { + return shards; + } + } + + throw new KinesisClientLibIOException("Hash range of shards returned was incomplete after " + + retriesForCompleteHashRange + " retries."); } private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { @@ -355,6 +374,30 @@ public class HierarchicalShardSyncer { " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); } + private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + + final Comparator shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator(); + shards.sort(shardStartingHashKeyBasedComparator); + + if (!shards.get(0).hashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || + !shards.get(shards.size() - 1).hashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { + return false; + } + + if (shards.size() > 1) { + for (int i = 1; i < shards.size(); i++) { + final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey()); + final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey()); + + if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + return false; + } + } + } + + return true; + } + /** * Determine new leases to create and their initial checkpoint. * Note: Package level access only for testing purposes. @@ -414,7 +457,7 @@ public class HierarchicalShardSyncer { * Check if this shard is a descendant of a shard that is (or will be) processed. * Create leases for the ancestors of this shard as required. * See javadoc of determineNewLeasesToCreate() for rules and example. - * + * * @param shardId The shardId to check. * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that * location in the shard (when an application starts up for the first time - and there are no checkpoints). @@ -789,6 +832,28 @@ public class HierarchicalShardSyncer { .map(streamId -> streamId.serialize()).orElse("single_stream_mode"); } + /** + * Helper class to compare shards based on their hash range. + */ + @RequiredArgsConstructor + private static class ShardStartingHashKeyBasedComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + /** + * Compares two shards based on their starting hash keys. + * We assume that the shards provided are non-null. + * + * {@inheritDoc} + */ + @Override + public int compare(Shard shard1, Shard shard2) { + BigInteger hashKey1 = new BigInteger(shard1.hashKeyRange().startingHashKey()); + BigInteger hashKey2 = new BigInteger(shard2.hashKeyRange().startingHashKey()); + + return hashKey1.compareTo(hashKey2); + } + } + /** Helper class to compare leases based on starting sequence number of the corresponding shards. * */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 30e4f081..963f84fd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1683,7 +1683,84 @@ public class HierarchicalShardSyncerTest { verify(shardDetector, atLeast(1)).listShards(); } -// /**getShardFilterFromInitialPosition + /** + * Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three + * retries, we fast fail and throw an exception. + * @throws Exception + */ + @Test(expected = KinesisClientLibIOException.class) + public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception { + final List shardsWithIncompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange); + + try { + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + } finally { + verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + } + } + + /** + * Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times + * and a complete hash range the final time, we create the leases. + * @throws Exception + */ + @Test + public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception { + final List shardsWithIncompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ); + final List shardsWithCompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), + ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange) + .thenReturn(shardsWithIncompleteHashRange).thenReturn(shardsWithCompleteHashRange); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class)); + } + + /** + * Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards. + * @throws Exception + */ + @Test + public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception { + final List shardsWithCompleteHashRange = Arrays.asList( + ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), + ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY)) + ); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithCompleteHashRange); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries. + verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class)); + } + +// /** // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // */ // @Test From ea093a44660729e576b745591ebcde552ef3c98d Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 30 Mar 2020 19:30:11 -0400 Subject: [PATCH 2/4] Adding stream id to exception --- .../amazon/kinesis/leases/HierarchicalShardSyncer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index d398fb34..07521335 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -347,6 +347,8 @@ public class HierarchicalShardSyncer { InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); + final String streamName = shardDetector.streamIdentifier().streamName(); + List shards; for (int i = 0; i < retriesForCompleteHashRange; i++) { @@ -354,8 +356,7 @@ public class HierarchicalShardSyncer { if (shards == null) { throw new KinesisClientLibIOException( - "Stream " + shardDetector.streamIdentifier().streamName() + - " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); + "Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list."); } if (hashRangeOfShardsIsComplete(shards)) { @@ -363,7 +364,7 @@ public class HierarchicalShardSyncer { } } - throw new KinesisClientLibIOException("Hash range of shards returned was incomplete after " + throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after " + retriesForCompleteHashRange + " retries."); } From 01db7753f0e37a62c4c587d56415728812c74866 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 8 Apr 2020 00:31:49 -0400 Subject: [PATCH 3/4] Making test cases actually go through validation steps. --- .../amazon/kinesis/leases/HierarchicalShardSyncerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 963f84fd..e1dfc52a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1716,8 +1716,8 @@ public class HierarchicalShardSyncerTest { @Test public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception { final List shardsWithIncompleteHashRange = Arrays.asList( - ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")), - ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3")) + ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")), + ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY)) ); final List shardsWithCompleteHashRange = Arrays.asList( ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")), From c7fe06dab50112f9223e179020fd2e538965df62 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 8 Apr 2020 00:31:55 -0400 Subject: [PATCH 4/4] PR comments --- .../leases/HierarchicalShardSyncer.java | 27 +++++++++++++------ .../leases/exceptions/ShardSyncer.java | 6 ++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 07521335..e0acf862 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -68,12 +68,14 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; - public static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); - public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); - public static final int retriesForCompleteHashRange = 3; - private String streamIdentifier = ""; + private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); + private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); + private static final int retriesForCompleteHashRange = 3; + + private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; + public HierarchicalShardSyncer() { isMultiStreamMode = false; } @@ -109,7 +111,7 @@ public class HierarchicalShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { this.streamIdentifier = shardDetector.streamIdentifier().serialize(); final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); @@ -344,7 +346,7 @@ public class HierarchicalShardSyncer { } private static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, - InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException, InterruptedException { final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); final String streamName = shardDetector.streamIdentifier().streamName(); @@ -362,6 +364,8 @@ public class HierarchicalShardSyncer { if (hashRangeOfShardsIsComplete(shards)) { return shards; } + + Thread.sleep(DELAY_BETWEEN_LIST_SHARDS_MILLIS); } throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after " @@ -377,6 +381,10 @@ public class HierarchicalShardSyncer { private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + if (shards.isEmpty()) { + throw new IllegalStateException("No shards found when attempting to validate complete hash range."); + } + final Comparator shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator(); shards.sort(shardStartingHashKeyBasedComparator); @@ -387,10 +395,13 @@ public class HierarchicalShardSyncer { if (shards.size() > 1) { for (int i = 1; i < shards.size(); i++) { - final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey()); - final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey()); + final Shard shardAtStartOfPossibleHole = shards.get(i - 1); + final Shard shardAtEndOfPossibleHole = shards.get(i); + final BigInteger startOfPossibleHole = new BigInteger(shardAtStartOfPossibleHole.hashKeyRange().endingHashKey()); + final BigInteger endOfPossibleHole = new BigInteger(shardAtEndOfPossibleHole.hashKeyRange().startingHashKey()); if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + log.error("Incomplete hash range found between {} and {}.", shardAtStartOfPossibleHole, shardAtEndOfPossibleHole); return false; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index c0ed2d2a..182854ff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -40,8 +40,8 @@ public class ShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException { - HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); + KinesisClientLibIOException, InterruptedException { + HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, + scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); } }