Merge pull request #5 from ashwing/ValidateHashRange
Adding hash range validation for empty lease table
This commit is contained in:
commit
6c73be5d92
3 changed files with 165 additions and 11 deletions
|
|
@ -70,6 +70,12 @@ public class HierarchicalShardSyncer {
|
|||
|
||||
private String streamIdentifier = "";
|
||||
|
||||
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
|
||||
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
|
||||
private static final int retriesForCompleteHashRange = 3;
|
||||
|
||||
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
|
||||
|
||||
public HierarchicalShardSyncer() {
|
||||
isMultiStreamMode = false;
|
||||
}
|
||||
|
|
@ -105,7 +111,7 @@ public class HierarchicalShardSyncer {
|
|||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
||||
final MetricsScope scope, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
||||
final boolean garbageCollectLeases, final boolean isLeaseTableEmpty)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
|
||||
this.streamIdentifier = shardDetector.streamIdentifier().serialize();
|
||||
final List<Shard> latestShards = isLeaseTableEmpty ?
|
||||
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
|
||||
|
|
@ -340,12 +346,30 @@ public class HierarchicalShardSyncer {
|
|||
}
|
||||
|
||||
private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDetector shardDetector,
|
||||
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException {
|
||||
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
|
||||
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShardsWithFilter(shardFilter));
|
||||
InitialPositionInStreamExtended initialPositionInStreamExtended) throws KinesisClientLibIOException, InterruptedException {
|
||||
|
||||
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
|
||||
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
|
||||
final ShardFilter shardFilter = getShardFilterFromInitialPosition(initialPositionInStreamExtended);
|
||||
final String streamName = shardDetector.streamIdentifier().streamName();
|
||||
|
||||
List<Shard> shards;
|
||||
|
||||
for (int i = 0; i < retriesForCompleteHashRange; i++) {
|
||||
shards = shardDetector.listShardsWithFilter(shardFilter);
|
||||
|
||||
if (shards == null) {
|
||||
throw new KinesisClientLibIOException(
|
||||
"Stream " + streamName + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
|
||||
}
|
||||
|
||||
if (hashRangeOfShardsIsComplete(shards)) {
|
||||
return shards;
|
||||
}
|
||||
|
||||
Thread.sleep(DELAY_BETWEEN_LIST_SHARDS_MILLIS);
|
||||
}
|
||||
|
||||
throw new KinesisClientLibIOException("Hash range of shards returned for " + streamName + " was incomplete after "
|
||||
+ retriesForCompleteHashRange + " retries.");
|
||||
}
|
||||
|
||||
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
|
||||
|
|
@ -355,6 +379,37 @@ public class HierarchicalShardSyncer {
|
|||
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
|
||||
}
|
||||
|
||||
private static boolean hashRangeOfShardsIsComplete(@NonNull List<Shard> shards) {
|
||||
|
||||
if (shards.isEmpty()) {
|
||||
throw new IllegalStateException("No shards found when attempting to validate complete hash range.");
|
||||
}
|
||||
|
||||
final Comparator<Shard> shardStartingHashKeyBasedComparator = new ShardStartingHashKeyBasedComparator();
|
||||
shards.sort(shardStartingHashKeyBasedComparator);
|
||||
|
||||
if (!shards.get(0).hashKeyRange().startingHashKey().equals(MIN_HASH_KEY) ||
|
||||
!shards.get(shards.size() - 1).hashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (shards.size() > 1) {
|
||||
for (int i = 1; i < shards.size(); i++) {
|
||||
final Shard shardAtStartOfPossibleHole = shards.get(i - 1);
|
||||
final Shard shardAtEndOfPossibleHole = shards.get(i);
|
||||
final BigInteger startOfPossibleHole = new BigInteger(shardAtStartOfPossibleHole.hashKeyRange().endingHashKey());
|
||||
final BigInteger endOfPossibleHole = new BigInteger(shardAtEndOfPossibleHole.hashKeyRange().startingHashKey());
|
||||
|
||||
if (!endOfPossibleHole.subtract(startOfPossibleHole).equals(BigInteger.ONE)) {
|
||||
log.error("Incomplete hash range found between {} and {}.", shardAtStartOfPossibleHole, shardAtEndOfPossibleHole);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine new leases to create and their initial checkpoint.
|
||||
* Note: Package level access only for testing purposes.
|
||||
|
|
@ -414,7 +469,7 @@ public class HierarchicalShardSyncer {
|
|||
* Check if this shard is a descendant of a shard that is (or will be) processed.
|
||||
* Create leases for the ancestors of this shard as required.
|
||||
* See javadoc of determineNewLeasesToCreate() for rules and example.
|
||||
*
|
||||
*
|
||||
* @param shardId The shardId to check.
|
||||
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
|
||||
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
|
||||
|
|
@ -789,6 +844,28 @@ public class HierarchicalShardSyncer {
|
|||
.map(streamId -> streamId.serialize()).orElse("single_stream_mode");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to compare shards based on their hash range.
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
private static class ShardStartingHashKeyBasedComparator implements Comparator<Shard>, Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* Compares two shards based on their starting hash keys.
|
||||
* We assume that the shards provided are non-null.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public int compare(Shard shard1, Shard shard2) {
|
||||
BigInteger hashKey1 = new BigInteger(shard1.hashKeyRange().startingHashKey());
|
||||
BigInteger hashKey2 = new BigInteger(shard2.hashKeyRange().startingHashKey());
|
||||
|
||||
return hashKey1.compareTo(hashKey2);
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper class to compare leases based on starting sequence number of the corresponding shards.
|
||||
*
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -40,8 +40,8 @@ public class ShardSyncer {
|
|||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
|
||||
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
|
||||
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
||||
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
|
||||
KinesisClientLibIOException, InterruptedException {
|
||||
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
|
||||
scope, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases, leaseRefresher.isLeaseTableEmpty());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1683,7 +1683,84 @@ public class HierarchicalShardSyncerTest {
|
|||
verify(shardDetector, atLeast(1)).listShards();
|
||||
}
|
||||
|
||||
// /**getShardFilterFromInitialPosition
|
||||
/**
|
||||
* Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three
|
||||
* retries, we fast fail and throw an exception.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(expected = KinesisClientLibIOException.class)
|
||||
public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception {
|
||||
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
|
||||
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("0", "1")),
|
||||
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("2", "3"))
|
||||
);
|
||||
|
||||
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
|
||||
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange);
|
||||
|
||||
try {
|
||||
hierarchicalShardSyncer
|
||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
|
||||
dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||
} finally {
|
||||
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times
|
||||
* and a complete hash range the final time, we create the leases.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
|
||||
final List<Shard> shardsWithIncompleteHashRange = Arrays.asList(
|
||||
ShardObjectHelper.newShard("shardId-0", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "69")),
|
||||
ShardObjectHelper.newShard("shardId-1", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("71", ShardObjectHelper.MAX_HASH_KEY))
|
||||
);
|
||||
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
|
||||
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
|
||||
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
|
||||
);
|
||||
|
||||
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
|
||||
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithIncompleteHashRange)
|
||||
.thenReturn(shardsWithIncompleteHashRange).thenReturn(shardsWithCompleteHashRange);
|
||||
|
||||
hierarchicalShardSyncer
|
||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
|
||||
dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||
|
||||
verify(shardDetector, times(3)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
|
||||
verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {
|
||||
final List<Shard> shardsWithCompleteHashRange = Arrays.asList(
|
||||
ShardObjectHelper.newShard("shardId-2", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange(ShardObjectHelper.MIN_HASH_KEY, "420")),
|
||||
ShardObjectHelper.newShard("shardId-3", null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"), ShardObjectHelper.newHashKeyRange("421", ShardObjectHelper.MAX_HASH_KEY))
|
||||
);
|
||||
|
||||
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
|
||||
when(shardDetector.listShardsWithFilter(any(ShardFilter.class))).thenReturn(shardsWithCompleteHashRange);
|
||||
|
||||
hierarchicalShardSyncer
|
||||
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
|
||||
SCOPE, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, garbageCollectLeases,
|
||||
dynamoDBLeaseRefresher.isLeaseTableEmpty());
|
||||
|
||||
verify(shardDetector, times(1)).listShardsWithFilter(any(ShardFilter.class)); // Verify retries.
|
||||
verify(dynamoDBLeaseRefresher, times(2)).createLeaseIfNotExists(any(Lease.class));
|
||||
}
|
||||
|
||||
// /**
|
||||
// * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent.
|
||||
// */
|
||||
// @Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue