PR comments
This commit is contained in:
parent
01db7753f0
commit
c7fe06dab5
2 changed files with 22 additions and 11 deletions
|
|
@ -68,12 +68,14 @@ public class HierarchicalShardSyncer {
|
||||||
|
|
||||||
private final boolean isMultiStreamMode;
|
private final boolean isMultiStreamMode;
|
||||||
|
|
||||||
public static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
|
|
||||||
public static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
|
|
||||||
public static final int retriesForCompleteHashRange = 3;
|
|
||||||
|
|
||||||
private String streamIdentifier = "";
|
private String streamIdentifier = "";
|
||||||
|
|
||||||
|
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
|
||||||
|
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
|
||||||
|
private static final int retriesForCompleteHashRange = 3;
|
||||||
|
|
||||||
|
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
|
||||||
|
|
||||||
public HierarchicalShardSyncer() {
|
public HierarchicalShardSyncer() {
|
||||||
isMultiStreamMode = false;
|
isMultiStreamMode = false;
|
||||||
}
|
}
|
||||||
|
|
@ -109,7 +111,7 @@ public class HierarchicalShardSyncer {
|
||||||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
||||||
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
||||||
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
|
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
|
||||||
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
|
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
|
||||||
final List<Shard> latestShards = isLeaseTableEmpty ?
|
final List<Shard> latestShards = isLeaseTableEmpty ?
|
||||||
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
|
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
|
||||||
|
|
@ -344,7 +346,7 @@ public class HierarchicalShardSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
|
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
|
||||||
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
|
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException, InterruptedException {
|
||||||
|
|
||||||
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
|
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
|
||||||
final String streamName = shardDetector.streamIdentifier().streamName();
|
final String streamName = shardDetector.streamIdentifier().streamName();
|
||||||
|
|
@ -362,6 +364,8 @@ public class HierarchicalShardSyncer {
|
||||||
if (hashRangeOfShardsIsComplete(shards)) {
|
if (hashRangeOfShardsIsComplete(shards)) {
|
||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Thread.sleep(DELAY_BETWEEN_LIST_SHARDS_MILLIS);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
|
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
|
||||||
|
|
@ -377,6 +381,10 @@ public class HierarchicalShardSyncer {
|
||||||
|
|
||||||
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) {
|
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) {
|
||||||
|
|
||||||
|
if (shards.isEmpty()) {
|
||||||
|
throw new IllegalStateException("No shards found when attempting to validate complete hash range.");
|
||||||
|
}
|
||||||
|
|
||||||
final Comparator<Shard> shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator();
|
final Comparator<Shard> shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator();
|
||||||
shards.sort(shardStartingHashKeyBasedComparator);
|
shards.sort(shardStartingHashKeyBasedComparator);
|
||||||
|
|
||||||
|
|
@ -387,10 +395,13 @@ public class HierarchicalShardSyncer {
|
||||||
|
|
||||||
if (shards.size() > 1) {
|
if (shards.size() > 1) {
|
||||||
for (int i = 1; i < shards.size(); i++) {
|
for (int i = 1; i < shards.size(); i++) {
|
||||||
final BigInteger startOfPossibleHole = new BigInteger(shards.get(i - 1).hashKeyRange().endingHashKey());
|
final Shard shardAtStartOfPossibleHole = shards.get(i - 1);
|
||||||
final BigInteger endOfPossibleHole = new BigInteger(shards.get(i).hashKeyRange().startingHashKey());
|
final Shard shardAtEndOfPossibleHole = shards.get(i);
|
||||||
|
final BigInteger startOfPossibleHole = new BigInteger(shardAtStartOfPossibleHole.hashKeyRange().endingHashKey());
|
||||||
|
final BigInteger endOfPossibleHole = new BigInteger(shardAtEndOfPossibleHole.hashKeyRange().startingHashKey());
|
||||||
|
|
||||||
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
|
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
|
||||||
|
log.error("Incomplete hash range found between {} and {}.", shardAtStartOfPossibleHole, shardAtEndOfPossibleHole);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ public class ShardSyncer {
|
||||||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
||||||
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
||||||
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||||
KinesisClientLibIOException {
|
KinesisClientLibIOException, InterruptedException {
|
||||||
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
||||||
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
|
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue