diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index aafbfcff..e191e5d0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -134,9 +134,6 @@ public class HierarchicalShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - - //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 - if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } else { @@ -176,8 +173,6 @@ public class HierarchicalShardSyncer { } } log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases); - final List trackedLeases = new ArrayList<>(currentLeases); - trackedLeases.addAll(newLeasesToCreate); return true; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index a0dbd1f5..d9f36481 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -19,8 +19,10 @@ package software.amazon.kinesis.leases; // import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; @@ -47,6 +49,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -75,6 +78,7 @@ import software.amazon.kinesis.metrics.NullMetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static software.amazon.kinesis.leases.HierarchicalShardSyncer.MemoizationContext; +import static software.amazon.kinesis.leases.HierarchicalShardSyncer.determineNewLeasesToCreate; @RunWith(MockitoJUnitRunner.class) public class HierarchicalShardSyncerTest { @@ -85,13 +89,51 @@ public class HierarchicalShardSyncerTest { private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended .newInitialPositionAtTimestamp(new Date(1000L)); private static final int EXPONENT = 128; - private static final String LEASE_OWNER = "TestOwnere"; + private static final String LEASE_OWNER = "TestOwner"; private static final MetricsScope SCOPE = new NullMetricsScope(); private static final boolean MULTISTREAM_MODE_ON = true; private static final String STREAM_IDENTIFIER = "acc:stream:1"; private static final HierarchicalShardSyncer.MultiStreamArgs MULTI_STREAM_ARGS = new HierarchicalShardSyncer.MultiStreamArgs( MULTISTREAM_MODE_ON, StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER)); + /** + *
+     * Shard structure (y-axis is
+     * epochs): 0 1 2 3 4   5- shards till
+     *          \ / \ / |   |
+     *           6   7  4   5- shards from epoch 103 - 205
+     *            \ /   |  /\
+     *             8    4 9 10 -
+     * shards from epoch 206 (open - no ending sequenceNumber)
+     * 
+ */ + private static final List SHARD_GRAPH_A = Collections.unmodifiableList(constructShardListForGraphA()); + + /** + * Shard structure (x-axis is epochs): + *
+     * 0  3   6   9
+     * \ / \ / \ /
+     *  2   5   8
+     * / \ / \ / \
+     * 1  4   7  10
+     * 
+ */ + private static final List SHARD_GRAPH_B = Collections.unmodifiableList(constructShardListForGraphB()); + + /** + *
+     * Shard structure (y-axis is
+     * epochs):     0      1  2  3  - shards till
+     *            /   \    |  \ /
+     *           4     5   1   6  - shards from epoch 103 - 205
+     *          / \   / \  |   |
+     *         7   8 9  10 1   6
+     * shards from epoch 206 (open - no ending sequenceNumber)
+     * 
+ */ + private static final List SHARD_GRAPH_C = Collections.unmodifiableList(constructShardListForGraphC()); + private final boolean ignoreUnexpectedChildShards = false; private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -125,8 +167,8 @@ public class HierarchicalShardSyncerTest { final List shards = Collections.emptyList(); final List leases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, - INITIAL_POSITION_LATEST).isEmpty(), equalTo(true)); + assertTrue(determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST) + .isEmpty()); } /** @@ -137,9 +179,8 @@ public class HierarchicalShardSyncerTest { final List leases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - assertThat(HierarchicalShardSyncer - .determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST, - new HashSet<>(), MULTI_STREAM_ARGS).isEmpty(), equalTo(true)); + assertTrue(determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, leases, INITIAL_POSITION_LATEST, + Collections.emptySet(), MULTI_STREAM_ARGS).isEmpty()); } /** @@ -156,15 +197,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + final List newLeases = determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); - validateHashRangeinLease(newLeases); - - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); - - assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + validateLeases(newLeases, shardId0, shardId1); } /** @@ -181,15 +216,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final HierarchicalShardSyncer.LeaseSynchronizer emptyLeaseTableSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, + final List newLeases = determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, new HashSet<>(), MULTI_STREAM_ARGS); - validateHashRangeinLease(newLeases); - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set expectedLeaseIds = new HashSet<>( - toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); - - assertThat(newLeases.size(), equalTo(expectedLeaseIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseIds)); + validateLeases(newLeases, toMultiStreamLeases(shardId0, shardId1)); } /** @@ -218,13 +247,9 @@ public class HierarchicalShardSyncerTest { final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, + final List newLeases = determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); - validateHashRangeinLease(newLeases); - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); - assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + validateLeases(newLeases, shardId0, shardId1); } /** @@ -245,7 +270,8 @@ public class HierarchicalShardSyncerTest { ShardObjectHelper.newShard(shardId2, shardId1, null, sequenceRange)); final List shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList()); - final List currentLeases = new ArrayList(createMultiStreamLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo")); + final List currentLeases = new ArrayList<>(createMultiStreamLeasesFromShards(shardsWithLeases, + ExtendedSequenceNumber.LATEST, "foo")); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); @@ -253,23 +279,29 @@ public class HierarchicalShardSyncerTest { final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, + final List newLeases = determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); - final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - validateHashRangeinLease(newLeases); - final Set expectedLeaseShardIds = new HashSet<>( - toMultiStreamLeaseList(Arrays.asList(shardId0, shardId1))); - assertThat(newLeases.size(), equalTo(expectedLeaseShardIds.size())); - assertThat(newLeaseKeys, equalTo(expectedLeaseShardIds)); + validateLeases(newLeases, toMultiStreamLeases(shardId0, shardId1)); } - private void validateHashRangeinLease(List leases) { + private static void validateHashRangeInLease(List leases) { final Consumer leaseValidation = lease -> { Validate.notNull(lease.hashKeyRangeForLease()); Validate.isTrue(lease.hashKeyRangeForLease().startingHashKey() .compareTo(lease.hashKeyRangeForLease().endingHashKey()) < 0); }; - leases.forEach(lease -> leaseValidation.accept(lease)); + leases.forEach(leaseValidation); + } + + /** + * Validates that a {@link Lease} exists for each expected lease key. + */ + private static void validateLeases(final List leases, final String... expectedLeaseKeys) { + validateHashRangeInLease(leases); + assertEquals(expectedLeaseKeys.length, leases.size()); + + final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + assertThat(leaseKeys, Matchers.containsInAnyOrder(expectedLeaseKeys)); } /** @@ -288,82 +320,89 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST); } + private void testLeaseCreation( + final List shards, + final boolean ignoreUnexpectedChildShards, + final String... expectedLeaseKeys) + throws Exception { + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + validateLeases(requestLeases, expectedLeaseKeys); + assertEquals(1, extendedSequenceNumbers.size()); + + extendedSequenceNumbers.forEach(seq -> assertEquals(ExtendedSequenceNumber.LATEST, seq)); + + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); + verify(dynamoDBLeaseRefresher, times(requestLeases.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + /** * Test checkAndCreateLeaseForNewShards while not providing a pre-fetched list of shards */ @Test public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception { - final List shards = constructShardListForGraphA(); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final Set expectedShardIds = new HashSet<>( - Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); - - final List requestLeases = leaseCaptor.getAllValues(); - final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - validateHashRangeinLease(requestLeases); - assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(requestLeaseKeys, equalTo(expectedShardIds)); - assertThat(extendedSequenceNumbers.size(), equalTo(1)); - - extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector, never()).listShards(); - verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testLeaseCreation(SHARD_GRAPH_A, false, + "shardId-4", "shardId-8", "shardId-9", "shardId-10"); } @Test public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() throws Exception { - final List shards = constructShardListForGraphA(); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final Set expectedShardIds = new HashSet<>( - toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); - - final List requestLeases = leaseCaptor.getAllValues(); - final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - validateHashRangeinLease(requestLeases); - assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(requestLeaseKeys, equalTo(expectedShardIds)); - assertThat(extendedSequenceNumbers.size(), equalTo(1)); - - extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector, never()).listShards(); - verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testLeaseCreation(SHARD_GRAPH_A, false, + toMultiStreamLeases("shardId-4", "shardId-8", "shardId-9", "shardId-10")); } - private List toMultiStreamLeaseList(List shardIdBasedLeases) { - return shardIdBasedLeases.stream().map(s -> STREAM_IDENTIFIER + ":" + s) - .collect(Collectors.toList()); + /** + * Converts one-or-more shard ids to their multi-stream equivalent. + * + * @param shardIds vararg of shard ids (i.e., {@code shardId-}) + * @return a same-sized array where the Nth element is the multi-stream + * equivalent of the Nth {@code shardIds} input + */ + private static String[] toMultiStreamLeases(final String... shardIds) { + final String[] multiStreamLeaseKey = new String[shardIds.length]; + for (int i = 0; i < shardIds.length; i++) { + multiStreamLeaseKey[i] = STREAM_IDENTIFIER + ":" + shardIds[i]; + } + return multiStreamLeaseKey; + } + + private void testCheckAndCreateLeasesForShardsWithShardList(final String... expectedLeaseKeys) throws Exception { + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + SHARD_GRAPH_A, false, SCOPE, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + validateLeases(requestLeases, expectedLeaseKeys); + assertEquals(1, extendedSequenceNumbers.size()); + + extendedSequenceNumbers.forEach(seq -> assertEquals(ExtendedSequenceNumber.LATEST, seq)); + + verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); + verify(dynamoDBLeaseRefresher, times(requestLeases.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } /** @@ -372,38 +411,7 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { - final List latestShards = constructShardListForGraphA(); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(latestShards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, false, SCOPE, - dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final Set expectedShardIds = new HashSet<>( - Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); - - final List requestLeases = leaseCaptor.getAllValues(); - final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(requestLeaseKeys, equalTo(expectedShardIds)); - assertThat(extendedSequenceNumbers.size(), equalTo(1)); - - validateHashRangeinLease(requestLeases); - - extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector, never()).listShards(); - verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testCheckAndCreateLeasesForShardsWithShardList("shardId-4", "shardId-8", "shardId-9", "shardId-10"); } /** @@ -412,36 +420,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception { - final List latestShards = constructShardListForGraphA(); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(latestShards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, false, SCOPE, - dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final Set expectedShardIds = new HashSet<>( - toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); - - final List requestLeases = leaseCaptor.getAllValues(); - final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(requestLeaseKeys, equalTo(expectedShardIds)); - assertThat(extendedSequenceNumbers.size(), equalTo(1)); - validateHashRangeinLease(requestLeases); - extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector, never()).listShards(); - verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testCheckAndCreateLeasesForShardsWithShardList( + toMultiStreamLeases("shardId-4", "shardId-8", "shardId-9", "shardId-10")); } /** @@ -450,26 +431,21 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { - final List shards = constructShardListForGraphA(); - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShards()).thenReturn(SHARD_GRAPH_A); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - new ArrayList(), false, SCOPE, + new ArrayList<>(), false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - final Set expectedShardIds = new HashSet<>(); - final List requestLeases = leaseCaptor.getAllValues(); final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); - validateHashRangeinLease(requestLeases); - assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); - assertThat(extendedSequenceNumbers.size(), equalTo(0)); + validateLeases(requestLeases); + assertEquals(0, extendedSequenceNumbers.size()); verify(shardDetector, never()).listShards(); verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); @@ -491,10 +467,10 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithEmptyLeaseTable() throws Exception { - final List shards = constructShardListForGraphA(); final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5")); - testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); + testCheckAndCreateLeaseForShardsIfMissing(SHARD_GRAPH_A, INITIAL_POSITION_TRIM_HORIZON, + expectedLeaseKeysToCreate); } /** @@ -511,10 +487,10 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable1() throws Exception { - final List shards = constructShardListForGraphA(); final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", "shardId-10")); - testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); + testCheckAndCreateLeaseForShardsIfMissing(SHARD_GRAPH_A, INITIAL_POSITION_AT_TIMESTAMP, + expectedLeaseKeysToCreate); } /** @@ -531,12 +507,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable2() throws Exception { - final List shards = constructShardListForGraphA(); final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-6", "shardId-7", "shardId-4", "shardId-5")); final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended .newInitialPositionAtTimestamp(new Date(200L)); - testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate); + testCheckAndCreateLeaseForShardsIfMissing(SHARD_GRAPH_A, initialPosition, expectedLeaseKeysToCreate); } /** @@ -553,10 +528,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() throws Exception { - final List shards = constructShardListForGraphA(); final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-8", "shardId-4", "shardId-9", "shardId-10")); - testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate); + testCheckAndCreateLeaseForShardsIfMissing(SHARD_GRAPH_A, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate); } /** @@ -574,7 +548,7 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithPartialLeaseTable() throws Exception { - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the // lease for shard-0 when reading from TRIM_HORIZON. final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); @@ -582,7 +556,7 @@ public class HierarchicalShardSyncerTest { .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.TRIM_HORIZON, LEASE_OWNER); - final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + final Set expectedLeaseKeysToCreate = Collections.singleton("shardId-0"); testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate, existingLeases); } @@ -601,7 +575,7 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable1() throws Exception { - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the // lease for shard-0 when reading from AT_TIMESTAMP. final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); @@ -609,7 +583,7 @@ public class HierarchicalShardSyncerTest { .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); - final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + final Set expectedLeaseKeysToCreate = Collections.singleton("shardId-0"); testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate, existingLeases); } @@ -628,7 +602,7 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable2() throws Exception { - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended .newInitialPositionAtTimestamp(new Date(200L)); // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the @@ -638,7 +612,7 @@ public class HierarchicalShardSyncerTest { .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.AT_TIMESTAMP, LEASE_OWNER); - final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + final Set expectedLeaseKeysToCreate = Collections.singleton("shardId-0"); testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate, existingLeases); } @@ -657,7 +631,7 @@ public class HierarchicalShardSyncerTest { */ @Test public void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() throws Exception { - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; // Leases for shard-0 and its descendants (shard-6, and shard-8) are missing. Expect lease sync to recover the // lease for shard-0 when reading from LATEST. final Set missingLeaseKeys = new HashSet<>(Arrays.asList("shardId-0", "shardId-6", "shardId-8")); @@ -665,13 +639,13 @@ public class HierarchicalShardSyncerTest { .filter(s -> !missingLeaseKeys.contains(s.shardId())).collect(Collectors.toList()); final List existingLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, LEASE_OWNER); - final Set expectedLeaseKeysToCreate = new HashSet<>(Arrays.asList("shardId-0")); + final Set expectedLeaseKeysToCreate = Collections.singleton("shardId-0"); testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate, existingLeases); } @Test(expected = KinesisClientLibIOException.class) public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() throws Exception { - final List shards = new ArrayList<>(constructShardListForGraphA()); + final List shards = new ArrayList<>(SHARD_GRAPH_A); final SequenceNumberRange range = shards.get(0).sequenceNumberRange().toBuilder().endingSequenceNumber(null) .build(); final Shard shard = shards.get(3).toBuilder().sequenceNumberRange(range).build(); @@ -691,7 +665,7 @@ public class HierarchicalShardSyncerTest { @Test(expected = KinesisClientLibIOException.class) public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream() throws Exception { - final List shards = new ArrayList<>(constructShardListForGraphA()); + final List shards = new ArrayList<>(SHARD_GRAPH_A); final SequenceNumberRange range = shards.get(0).sequenceNumberRange().toBuilder().endingSequenceNumber(null) .build(); final Shard shard = shards.get(3).toBuilder().sequenceNumberRange(range).build(); @@ -710,98 +684,40 @@ public class HierarchicalShardSyncerTest { } } + private void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren( + final String... expectedLeaseKeys) throws Exception { + final List shards = new ArrayList<>(SHARD_GRAPH_A); + final Shard shard = shards.get(5); + assertEquals("shardId-5", shard.shardId()); + + shards.remove(5); + + // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 + // is not closed, those children should be ignored when syncing shards, no leases + // should be obtained for them, and we should obtain a lease on the still-open + // parent. + shards.add(5, + shard.toBuilder() + .sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build()) + .build()); + + testLeaseCreation(shards, true, expectedLeaseKeys); + } + /** * Test checkAndCreateLeasesForNewShards() when a parent is open and children of open parents are being ignored. */ @Test public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren() throws Exception { - final List shards = new ArrayList<>(constructShardListForGraphA()); - final Shard shard = shards.get(5); - assertThat(shard.shardId(), equalTo("shardId-5")); - - shards.remove(5); - - // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 - // is not closed, those children should be ignored when syncing shards, no leases - // should be obtained for them, and we should obtain a lease on the still-open - // parent. - shards.add(5, - shard.toBuilder() - .sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build()) - .build()); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); - - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final List leases = leaseCaptor.getAllValues(); - final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set leaseSequenceNumbers = leases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - final Set expectedShardIds = new HashSet<>(Arrays.asList("shardId-4", "shardId-5", "shardId-8")); - - assertThat(leaseKeys.size(), equalTo(expectedShardIds.size())); - assertThat(leaseKeys, equalTo(expectedShardIds)); - assertThat(leaseSequenceNumbers.size(), equalTo(1)); - - leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector, never()).listShards(); - verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren( + "shardId-4", "shardId-5", "shardId-8"); } @Test public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildrenMultiStream() throws Exception { - final List shards = new ArrayList<>(constructShardListForGraphA()); - final Shard shard = shards.get(5); - assertThat(shard.shardId(), equalTo("shardId-5")); - - shards.remove(5); - - // shardId-5 in graph A has two children (shardId-9 and shardId-10). if shardId-5 - // is not closed, those children should be ignored when syncing shards, no leases - // should be obtained for them, and we should obtain a lease on the still-open - // parent. - shards.add(5, - shard.toBuilder() - .sequenceNumberRange(shard.sequenceNumberRange().toBuilder().endingSequenceNumber(null).build()) - .build()); - - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - - when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); - when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); - hierarchicalShardSyncer - .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - - final List leases = leaseCaptor.getAllValues(); - final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set leaseSequenceNumbers = leases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); - - final Set expectedShardIds = new HashSet<>(toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-5", "shardId-8"))); - - assertThat(leaseKeys.size(), equalTo(expectedShardIds.size())); - assertThat(leaseKeys, equalTo(expectedShardIds)); - assertThat(leaseSequenceNumbers.size(), equalTo(1)); - - leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - - verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); - verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); - verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringInconsistentChildren( + toMultiStreamLeases("shardId-4", "shardId-5", "shardId-8")); } @Test @@ -819,7 +735,7 @@ public class HierarchicalShardSyncerTest { private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSequenceNumber sequenceNumber, final InitialPositionInStreamExtended position) throws Exception { final String shardIdPrefix = "shardId-%d"; - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; final List leases = createLeasesFromShards(shards, sequenceNumber, LEASE_OWNER); // Marking shardId-0 as ShardEnd. @@ -846,7 +762,7 @@ public class HierarchicalShardSyncerTest { final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); - assertThat(createLeases, equalTo(expectedCreateLeases)); + assertEquals(expectedCreateLeases, createLeases); verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -858,7 +774,7 @@ public class HierarchicalShardSyncerTest { SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); - assertThat(deleteLeases.size(), equalTo(0)); + assertTrue(deleteLeases.isEmpty()); verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); @@ -883,7 +799,7 @@ public class HierarchicalShardSyncerTest { final ExtendedSequenceNumber sequenceNumber, final InitialPositionInStreamExtended position) throws Exception { final String shardIdPrefix = "shardId-%d"; - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; final List leases = createLeasesFromShards(shards, sequenceNumber, LEASE_OWNER); // Marking shardId-0 as ShardEnd. @@ -933,11 +849,6 @@ public class HierarchicalShardSyncerTest { .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - final Set expectedShardIds = new HashSet<>( - Collections.singletonList(String.format(shardIdPrefix, 0))); - final Set expectedSequenceNumbers = new HashSet<>( - Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); @@ -986,7 +897,7 @@ public class HierarchicalShardSyncerTest { final ExtendedSequenceNumber sequenceNumber, final InitialPositionInStreamExtended position) throws Exception { final String shardIdPrefix = "shardId-%d"; - final List shards = constructShardListForGraphA(); + final List shards = SHARD_GRAPH_A; final List leases = createLeasesFromShards(shards, sequenceNumber, LEASE_OWNER); // Marking shardId-0 as ShardEnd. @@ -1044,16 +955,6 @@ public class HierarchicalShardSyncerTest { } } - private Lease createLeaseFromShard(final Shard shard, final ExtendedSequenceNumber checkpoint, - final String leaseOwner) { - return createLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0); - } - - private MultiStreamLease createMultiStreamLeaseFromShard(final Shard shard, final ExtendedSequenceNumber checkpoint, - final String leaseOwner) { - return createMultiStreamLeasesFromShards(Collections.singletonList(shard), checkpoint, leaseOwner).get(0); - } - private List createLeasesFromShards(final List shards, final ExtendedSequenceNumber checkpoint, final String leaseOwner) { return shards.stream().map(shard -> { @@ -1136,9 +1037,9 @@ public class HierarchicalShardSyncerTest { final Set expectedSequenceNumbers = new HashSet<>(Collections .singletonList(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().name()))); - assertThat(leases.size(), equalTo(expectedLeaseKeys.size())); - assertThat(leaseKeys, equalTo(expectedLeaseKeys)); - assertThat(leaseSequenceNumbers, equalTo(expectedSequenceNumbers)); + assertEquals(expectedLeaseKeys.size(), leases.size()); + assertEquals(expectedLeaseKeys, leaseKeys); + assertEquals(expectedSequenceNumbers, leaseSequenceNumbers); verify(dynamoDBLeaseRefresher, times(expectedLeaseKeys.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -1161,12 +1062,12 @@ public class HierarchicalShardSyncerTest { final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); for (InitialPositionInStreamExtended initialPosition : initialPositions) { - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, + final List newLeases = determineNewLeasesToCreate(emptyLeaseTableSynchronizer, shards, currentLeases, initialPosition); - assertThat(newLeases.size(), equalTo(2)); + assertEquals(2, newLeases.size()); for (Lease lease : newLeases) { - assertThat(expectedLeaseShardIds.contains(lease.leaseKey()), equalTo(true)); + assertTrue(expectedLeaseShardIds.contains(lease.leaseKey())); assertThat(lease.checkpoint(), equalTo(new ExtendedSequenceNumber(initialPosition.getInitialPositionInStream().toString()))); } @@ -1191,7 +1092,6 @@ public class HierarchicalShardSyncerTest { final List shards = Stream.of(shardsWithLeases, shardsWithoutLeases).flatMap(x -> x.stream()).collect(Collectors.toList()); final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); - final Set inconsistentShardIds = Collections.emptySet(); Map shardIdToShardMap = HierarchicalShardSyncer.constructShardIdToShardMap(shards); Map> shardIdToChildShardIdsMap = HierarchicalShardSyncer.constructShardIdToChildShardIdsMap(shardIdToShardMap); @@ -1199,7 +1099,7 @@ public class HierarchicalShardSyncerTest { final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, + final List newLeases = determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); assertThat(newLeases.size(), equalTo(1)); @@ -1222,12 +1122,12 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange1() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1246,11 +1146,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange2() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1269,14 +1169,14 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange3() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1295,11 +1195,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange4() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1319,14 +1219,14 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestC_PartialHashRange5() { - final List shards = constructShardListForGraphC(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-9", "shardId-10"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-6", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_C, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1345,10 +1245,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRange() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST); } /** @@ -1367,11 +1265,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST); } /** @@ -1390,14 +1286,13 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_EmptyLeaseTable() { - final List shards = constructShardListForGraphA(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-8", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, Collections.emptyList(), INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1416,11 +1311,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeAcrossDifferentEpochs() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", "shardId-9", "shardId-10"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST); } /** @@ -1439,11 +1332,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_PartialHashRange() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-6"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1462,10 +1355,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRange() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST); } /** @@ -1484,11 +1375,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST); } /** @@ -1507,12 +1396,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_EmptyLeaseTable() { - final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-9", ExtendedSequenceNumber.LATEST); expectedShardIdCheckpointMap.put("shardId-10", ExtendedSequenceNumber.LATEST); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, Collections.emptyList(), INITIAL_POSITION_LATEST, + expectedShardIdCheckpointMap); } /** @@ -1531,13 +1419,13 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange1() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1556,12 +1444,12 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange2() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1580,13 +1468,13 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange3() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1605,14 +1493,14 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange4() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1631,10 +1519,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRange() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -1653,11 +1539,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -1676,8 +1560,6 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_EmptyLeaseTable() { - final List shards = constructShardListForGraphA(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); @@ -1685,7 +1567,8 @@ public class HierarchicalShardSyncerTest { expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, Collections.emptyList(), INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1704,11 +1587,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeAcrossDifferentEpochs() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", "shardId-9", "shardId-10"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -1751,10 +1632,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRange() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -1773,11 +1652,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON); } /** @@ -1796,12 +1673,11 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_EmptyLeaseTable() { - final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.TRIM_HORIZON); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.TRIM_HORIZON); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, Collections.emptyList(), INITIAL_POSITION_TRIM_HORIZON, + expectedShardIdCheckpointMap); } /** @@ -1820,13 +1696,13 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange1() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-3", "shardId-4", "shardId-5"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); } /** @@ -1845,12 +1721,12 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange2() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-7"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); } /** @@ -1869,13 +1745,13 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange3() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-2", "shardId-6"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); } /** @@ -1894,14 +1770,14 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange4() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-9", "shardId-10"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-2", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); } /** @@ -1920,10 +1796,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRange() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); } /** @@ -1942,11 +1816,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5", "shardId-6", "shardId-7"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); } /** @@ -1965,8 +1837,6 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_EmptyLeaseTable() { - final List shards = constructShardListForGraphA(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); @@ -1974,7 +1844,8 @@ public class HierarchicalShardSyncerTest { expectedShardIdCheckpointMap.put("shardId-3", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-4", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-5", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, Collections.emptyList(), INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); } /** @@ -1993,11 +1864,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeAcrossDifferentEpochs() { - final List shards = constructShardListForGraphA(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-4", "shardId-7", "shardId-9", "shardId-10"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_A, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); } /** @@ -2040,10 +1909,8 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRange() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); } /** @@ -2062,11 +1929,9 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRangeWithoutGC() { - final List shards = constructShardListForGraphB(); final List shardIdsOfCurrentLeases = Arrays.asList("shardId-0", "shardId-1", "shardId-2", "shardId-3", "shardId-4", "shardId-5"); - final Map expectedNoNewLeases = Collections.emptyMap(); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP); } /** @@ -2085,12 +1950,18 @@ public class HierarchicalShardSyncerTest { */ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_EmptyLeaseTable() { - final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Collections.emptyList(); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-0", ExtendedSequenceNumber.AT_TIMESTAMP); expectedShardIdCheckpointMap.put("shardId-1", ExtendedSequenceNumber.AT_TIMESTAMP); - assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); + assertExpectedLeasesAreCreated(SHARD_GRAPH_B, Collections.emptyList(), INITIAL_POSITION_AT_TIMESTAMP, + expectedShardIdCheckpointMap); + } + + private void assertExpectedLeasesAreCreated( + final List shards, + final List shardIdsOfCurrentLeases, + final InitialPositionInStreamExtended initialPosition) { + assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, initialPosition, Collections.emptyMap()); } private void assertExpectedLeasesAreCreated(List shards, @@ -2108,7 +1979,7 @@ public class HierarchicalShardSyncerTest { final HierarchicalShardSyncer.LeaseSynchronizer nonEmptyLeaseTableSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, + final List newLeases = determineNewLeasesToCreate(nonEmptyLeaseTableSynchronizer, shards, currentLeases, initialPosition); assertThat(newLeases.size(), equalTo(expectedShardIdCheckpointMap.size())); @@ -2130,7 +2001,7 @@ public class HierarchicalShardSyncerTest { * shards from epoch 206 (open - no ending sequenceNumber) * */ - private List constructShardListForGraphA() { + private static List constructShardListForGraphA() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "205"); @@ -2226,7 +2097,7 @@ public class HierarchicalShardSyncerTest { * 1 4 7 10 * */ - private List constructShardListForGraphB() { + private static List constructShardListForGraphB() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("1000", "1049"); final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("1050", "1099"); final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("1100", "1149"); @@ -2263,7 +2134,7 @@ public class HierarchicalShardSyncerTest { * shards from epoch 206 (open - no ending sequenceNumber) * */ - private List constructShardListForGraphC() { + private static List constructShardListForGraphC() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("103", null); @@ -2302,9 +2173,9 @@ public class HierarchicalShardSyncerTest { public void testCheckIfDescendantAndAddNewLeasesForAncestorsNullShardId() { final MemoizationContext memoizationContext = new MemoizationContext(); - assertThat(HierarchicalShardSyncer + assertFalse(HierarchicalShardSyncer .checkIfDescendantAndAddNewLeasesForAncestors(null, INITIAL_POSITION_LATEST, null, null, - null, memoizationContext), equalTo(false)); + null, memoizationContext)); } /** @@ -2315,9 +2186,9 @@ public class HierarchicalShardSyncerTest { final String shardId = "shardId-trimmed"; final MemoizationContext memoizationContext = new MemoizationContext(); - assertThat(HierarchicalShardSyncer + assertFalse(HierarchicalShardSyncer .checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, null, - new HashMap<>(), null, memoizationContext), equalTo(false)); + new HashMap<>(), null, memoizationContext)); } /** @@ -2332,10 +2203,9 @@ public class HierarchicalShardSyncerTest { final Map kinesisShards = new HashMap<>(); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, null, null, null)); - assertThat( + assertTrue( HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, - shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(true)); - assertThat(newLeaseMap.isEmpty(), equalTo(true)); + shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext)); } /** @@ -2355,10 +2225,9 @@ public class HierarchicalShardSyncerTest { kinesisShards.put(adjacentParentShardId, ShardObjectHelper.newShard(adjacentParentShardId, null, null, null)); kinesisShards.put(shardId, ShardObjectHelper.newShard(shardId, parentShardId, adjacentParentShardId, null)); - assertThat( + assertFalse( HierarchicalShardSyncer.checkIfDescendantAndAddNewLeasesForAncestors(shardId, INITIAL_POSITION_LATEST, - shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext), equalTo(false)); - assertThat(newLeaseMap.isEmpty(), equalTo(true)); + shardIdsOfCurrentLeases, kinesisShards, newLeaseMap, memoizationContext)); } /** @@ -3021,9 +2890,6 @@ public class HierarchicalShardSyncerTest { // /** * Helper method. - * - * @param shardId - * @return */ private static Lease newLease(final String shardId) { final Lease lease = new Lease();