From c7fe06dab50112f9223e179020fd2e538965df62 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 8 Apr 2020 00:31:55 -0400 Subject: [PATCH] PR comments --- .../leases/HierarchicalShardSyncer.java | 27 +++++++++++++------ .../leases/exceptions/ShardSyncer.java | 6 ++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 07521335..e0acf862 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -68,12 +68,14 @@ public class HierarchicalShardSyncer { private final boolean isMultiStreamMode; - public static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); - public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); - public static final int retriesForCompleteHashRange = 3; - private String streamIdentifier = ""; + private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); + private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); + private static final int retriesForCompleteHashRange = 3; + + private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; + public HierarchicalShardSyncer() { isMultiStreamMode = false; } @@ -109,7 +111,7 @@ public class HierarchicalShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final boolean garbageCollectLeases, final boolean isLeaseTableEmpty) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { this.streamIdentifier = shardDetector.streamIdentifier().serialize(); final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); @@ -344,7 +346,7 @@ public class HierarchicalShardSyncer { } private static List getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector, - InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException { + InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException, InterruptedException { final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended); final String streamName = shardDetector.streamIdentifier().streamName(); @@ -362,6 +364,8 @@ public class HierarchicalShardSyncer { if (hashRangeOfShardsIsComplete(shards)) { return shards; } + + Thread.sleep(DELAY_BETWEEN_LIST_SHARDS_MILLIS); } throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after " @@ -377,6 +381,10 @@ public class HierarchicalShardSyncer { private static boolean hashRangeOfShardsIsComplete(@NonNull List shards) { + if (shards.isEmpty()) { + throw new IllegalStateException("No shards found when attempting to validate complete hash range."); + } + final Comparator shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator(); shards.sort(shardStartingHashKeyBasedComparator); @@ -387,10 +395,13 @@ public class HierarchicalShardSyncer { if (shards.size() > 1) { for (int i = 1; i < shards.size(); i++) { - final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey()); - final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey()); + final Shard shardAtStartOfPossibleHole = shards.get(i - 1); + final Shard shardAtEndOfPossibleHole = shards.get(i); + final BigInteger startOfPossibleHole = new BigInteger(shardAtStartOfPossibleHole.hashKeyRange().endingHashKey()); + final BigInteger endOfPossibleHole = new BigInteger(shardAtEndOfPossibleHole.hashKeyRange().startingHashKey()); if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) { + log.error("Incomplete hash range found between {} and {}.", shardAtStartOfPossibleHole, shardAtEndOfPossibleHole); return false; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index c0ed2d2a..182854ff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -40,8 +40,8 @@ public class ShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException { - HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); + KinesisClientLibIOException, InterruptedException { + HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, + scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty()); } }