diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index e44125a5..175d62ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -855,22 +855,20 @@ public class HierarchicalShardSyncer { * * the parent shard has expired. *

* For example: + *

          * Shard structure (each level depicts a stream segment):
          * 0 1 2 3 4   5   - shards till epoch 102
          * \ / \ / |   |
          *  6   7  4   5   - shards from epoch 103 - 205
          *  \  /   |  / \
          *   8     4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
-         *
-         * Current leases: (4, 5, 7)
-         *
-         * If initial position is LATEST:
-         *   - New leases to create: (6)
-         * If initial position is TRIM_HORIZON:
-         *   - New leases to create: (0, 1)
-         * If initial position is AT_TIMESTAMP(epoch=200):
-         *   - New leases to create: (0, 1)
-         *
+         * 
+ * Assuming current leases are (4, 5, 7), new leases to create for an initial position are: + * *

* The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index c390987c..c8ef05ba 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -73,7 +72,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static software.amazon.kinesis.leases.HierarchicalShardSyncer.MemoizationContext; @RunWith(MockitoJUnitRunner.class) -// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES public class HierarchicalShardSyncerTest { private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended .newInitialPosition(InitialPositionInStream.LATEST); @@ -320,7 +318,6 @@ public class HierarchicalShardSyncerTest { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); - } @Test @@ -355,7 +352,6 @@ public class HierarchicalShardSyncerTest { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); - } private List toMultiStreamLeaseList(List shardIdBasedLeases) { @@ -460,7 +456,6 @@ public class HierarchicalShardSyncerTest { final Set expectedShardIds = new HashSet<>(); final List requestLeases = leaseCaptor.getAllValues(); - final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) .collect(Collectors.toSet()); validateHashRangeinLease(requestLeases); @@ -472,7 +467,8 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } - /* + /** + *

      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -481,6 +477,7 @@ public class HierarchicalShardSyncerTest {
      *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
      * Initial position: TRIM_HORIZON
      * Leases to create: (0, 1, 2, 3, 4, 5)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithEmptyLeaseTable() throws Exception { @@ -490,7 +487,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -499,6 +497,7 @@ public class HierarchicalShardSyncerTest {
      *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
      * Initial position: AT_TIMESTAMP(1000)
      * Leases to create: (8, 4, 9, 10)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable1() throws Exception { @@ -508,7 +507,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -517,6 +517,7 @@ public class HierarchicalShardSyncerTest {
      *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
      * Initial position: AT_TIMESTAMP(200)
      * Leases to create: (6, 7, 4, 5)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable2() throws Exception { @@ -528,7 +529,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -537,6 +539,7 @@ public class HierarchicalShardSyncerTest {
      *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
      * Initial position: LATEST
      * Leases to create: (8, 4, 9, 10)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() throws Exception { @@ -546,7 +549,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -556,6 +560,7 @@ public class HierarchicalShardSyncerTest {
      * Missing leases: (0, 6, 8)
      * Initial position: TRIM_HORIZON
      * Leases to create: (0)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithPartialLeaseTable() throws Exception { @@ -571,7 +576,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate, existingLeases); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -581,6 +587,7 @@ public class HierarchicalShardSyncerTest {
      * Missing leases: (0, 6, 8)
      * Initial position: AT_TIMESTAMP(1000)
      * Leases to create: (0)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable1() throws Exception { @@ -596,7 +603,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate, existingLeases); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -606,6 +614,7 @@ public class HierarchicalShardSyncerTest {
      * Missing leases: (0, 6, 8)
      * Initial position: AT_TIMESTAMP(200)
      * Leases to create: (0)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable2() throws Exception { @@ -623,7 +632,8 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate, existingLeases); } - /* + /** + *
      * Shard structure (each level depicts a stream segment):
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
@@ -633,6 +643,7 @@ public class HierarchicalShardSyncerTest {
      * Missing leases: (0, 6, 8)
      * Initial position: LATEST
      * Leases to create: (0)
+     * 
*/ @Test public void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() throws Exception { @@ -835,9 +846,6 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List deleteLeases = leaseDeleteCaptor.getAllValues(); - final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); - final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) - .collect(Collectors.toSet()); assertThat(deleteLeases.size(), equalTo(0)); @@ -1074,7 +1082,6 @@ public class HierarchicalShardSyncerTest { final InitialPositionInStreamExtended initialPosition, final Set expectedLeaseKeys, final List existingLeases) throws Exception { - final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); @@ -1168,6 +1175,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1176,6 +1184,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (3, 4, 5)
      * Initial position: LATEST
      * Expected leases: (2, 6)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange1() { @@ -1190,6 +1199,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1198,7 +1208,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 7)
      * Initial position: LATEST
      * Expected leases: (6)
-     *
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange2() { @@ -1212,6 +1222,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1220,6 +1231,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (2, 6)
      * Initial position: LATEST
      * Expected leases: (3, 4, 9, 10)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange3() { @@ -1236,6 +1248,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1244,6 +1257,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 9, 10)
      * Initial position: LATEST
      * Expected leases: (8)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange4() { @@ -1256,6 +1270,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors + *
      * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is
      * epochs):     0      1  2  3  - shards till
      *            /   \    |  \ /
@@ -1266,6 +1281,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (9, 10)
      * Initial position: LATEST
      * Expected leases: (1, 6, 7, 8)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestC_PartialHashRange5() { @@ -1282,6 +1298,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1290,6 +1307,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 6, 7)
      * Initial position: LATEST
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRange() { @@ -1302,6 +1320,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1310,6 +1329,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
      * Initial position: LATEST
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeWithoutGC() { @@ -1323,6 +1343,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1331,6 +1352,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: LATEST
      * Expected leases: (4, 8, 9, 10)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_EmptyLeaseTable() { @@ -1347,6 +1369,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1355,6 +1378,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 4, 7, 9, 10)
      * Initial position: LATEST
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeAcrossDifferentEpochs() { @@ -1365,7 +1389,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1376,17 +1401,19 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (6)
      * Initial position: LATEST
      * Expected leases: (7)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_PartialHashRange() { final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Arrays.asList("shardId-6"); + final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-6"); final Map expectedShardIdCheckpointMap = new HashMap<>(); expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST); assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1397,16 +1424,18 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (5)
      * Initial position: LATEST
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRange() { final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); final Map expectedNoNewLeases = Collections.emptyMap(); assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1417,6 +1446,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5)
      * Initial position: LATEST
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRangeWithoutGC() { @@ -1427,7 +1457,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1438,6 +1469,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: LATEST
      * Expected leases: (9, 10)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeLatestB_EmptyLeaseTable() { @@ -1452,6 +1484,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1460,6 +1493,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (3, 4, 5)
      * Initial position: TRIM_HORIZON
      * Expected leases: (0, 1, 2)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange1() { @@ -1475,6 +1509,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1483,6 +1518,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 7)
      * Initial position: TRIM_HORIZON
      * Expected leases: (0, 1)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange2() { @@ -1497,6 +1533,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1505,6 +1542,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (2, 6)
      * Initial position: TRIM_HORIZON
      * Expected leases: (3, 4, 5)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange3() { @@ -1520,6 +1558,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1528,6 +1567,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 9, 10)
      * Initial position: TRIM_HORIZON
      * Expected leases: (0, 1, 2, 3)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange4() { @@ -1544,6 +1584,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1552,6 +1593,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 6, 7)
      * Initial position: TRIM_HORIZON
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRange() { @@ -1564,6 +1606,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1572,6 +1615,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
      * Initial position: TRIM_HORIZON
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeWithoutGC() { @@ -1585,6 +1629,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1593,6 +1638,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: TRIM_HORIZON
      * Expected leases: (0, 1, 2, 3, 4, 5)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_EmptyLeaseTable() { @@ -1611,6 +1657,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1619,6 +1666,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 4, 7, 9, 10)
      * Initial position: TRIM_HORIZON
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeAcrossDifferentEpochs() { @@ -1629,7 +1677,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1640,6 +1689,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (6)
      * Initial position: TRIM_HORIZON
      * Expected leases: (7)
+     * 
*/ // TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases // @Test @@ -1651,7 +1701,8 @@ public class HierarchicalShardSyncerTest { // assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap); // } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1662,16 +1713,18 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (5)
      * Initial position: TRIM_HORIZON
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRange() { final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); final Map expectedNoNewLeases = Collections.emptyMap(); assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1682,6 +1735,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5)
      * Initial position: TRIM_HORIZON
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRangeWithoutGC() { @@ -1692,7 +1746,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1703,6 +1758,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: TRIM_HORIZON
      * Expected leases: (0, 1)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeHorizonB_EmptyLeaseTable() { @@ -1717,6 +1773,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1725,6 +1782,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (3, 4, 5)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (0, 1, 2)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange1() { @@ -1740,6 +1798,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1748,6 +1807,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 7)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (0, 1)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange2() { @@ -1762,6 +1822,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1770,6 +1831,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (2, 6)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (3, 4, 5)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange3() { @@ -1785,6 +1847,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1793,6 +1856,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 9, 10)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (0, 1, 2, 3)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange4() { @@ -1809,6 +1873,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1817,6 +1882,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (4, 5, 6, 7)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRange() { @@ -1829,6 +1895,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1837,6 +1904,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeWithoutGC() { @@ -1850,6 +1918,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1858,6 +1927,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (0, 1, 2, 3, 4, 5)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_EmptyLeaseTable() { @@ -1876,6 +1946,7 @@ public class HierarchicalShardSyncerTest { /** * Test CheckIfDescendantAndAddNewLeasesForAncestors * Shard structure (each level depicts a stream segment): + *
      * 0 1 2 3 4   5- shards till epoch 102
      * \ / \ / |   |
      *  6   7  4   5- shards from epoch 103 - 205
@@ -1884,6 +1955,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 4, 7, 9, 10)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeAcrossDifferentEpochs() { @@ -1894,7 +1966,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1905,6 +1978,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (6)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (7)
+     * 
*/ // TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases // @Test @@ -1916,7 +1990,8 @@ public class HierarchicalShardSyncerTest { // assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap); // } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1927,16 +2002,18 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (5)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRange() { final List shards = constructShardListForGraphB(); - final List shardIdsOfCurrentLeases = Arrays.asList("shardId-5"); + final List shardIdsOfCurrentLeases = Collections.singletonList("shardId-5"); final Map expectedNoNewLeases = Collections.emptyMap(); assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1947,6 +2024,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: (0, 1, 2, 3, 4, 5)
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: empty set
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRangeWithoutGC() { @@ -1957,7 +2035,8 @@ public class HierarchicalShardSyncerTest { assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases); } - /* + /** + *
      * Shard structure (x-axis is epochs):
      * 0  3   6   9
      * \ / \ / \ /
@@ -1968,6 +2047,7 @@ public class HierarchicalShardSyncerTest {
      * Current leases: empty set
      * Initial position: AT_TIMESTAMP(1000)
      * Expected leases: (0, 1)
+     * 
*/ @Test public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_EmptyLeaseTable() { @@ -2005,7 +2085,8 @@ public class HierarchicalShardSyncerTest { } } - /* + /** + *
      * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
      * epochs): 0 1 2 3 4   5- shards till
      *          \ / \ / |   |
@@ -2013,6 +2094,7 @@ public class HierarchicalShardSyncerTest {
      *            \ /   |  /\
      *             8    4 9 10 -
      * shards from epoch 206 (open - no ending sequenceNumber)
+     * 
*/ private List constructShardListForGraphA() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); @@ -2075,7 +2157,8 @@ public class HierarchicalShardSyncerTest { throw new RuntimeException("Unsupported initial position " + initialPosition); } - /* + /** + *
      * Helper method to get expected shards for Graph A based on initial position in stream. Shard structure (y-axis is
      * epochs): 0 1 2 3 4   5- shards till
      *          \ / \ / |   |
@@ -2083,6 +2166,7 @@ public class HierarchicalShardSyncerTest {
      *            \ /   |  /\
      *             8    4 9 10 -
      * shards from epoch 206 (open - no ending sequenceNumber)
+     * 
*/ private Set getExpectedLeasesForGraphA(List shards, ExtendedSequenceNumber sequenceNumber, @@ -2097,15 +2181,17 @@ public class HierarchicalShardSyncerTest { return new HashSet<>(createLeasesFromShards(filteredShards, sequenceNumber, null)); } -// /* -// * Helper method to construct a shard list for graph B. Graph B is defined below. -// * Shard structure (x-axis is epochs): -// * 0 3 6 9 -// * \ / \ / \ / -// * 2 5 8 -// * / \ / \ / \ -// * 1 4 7 10 -// */ + /** + * Helper method to construct a shard list for graph B. Graph B is defined below. + * Shard structure (x-axis is epochs): + *
+     * 0  3   6   9
+     * \ / \ / \ /
+     *  2   5   8
+     * / \ / \ / \
+     * 1  4   7  10
+     * 
+ */ private List constructShardListForGraphB() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("1000", "1049"); final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("1050", "1099"); @@ -2133,6 +2219,7 @@ public class HierarchicalShardSyncerTest { } /** + *
      * Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is
      * epochs):     0      1  2  3  - shards till
      *            /   \    |  \ /
@@ -2140,6 +2227,7 @@ public class HierarchicalShardSyncerTest {
      *          / \   / \  |   |
      *         7   8 9  10 1   6
      * shards from epoch 206 (open - no ending sequenceNumber)
+     * 
*/ private List constructShardListForGraphC() { final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); @@ -2241,7 +2329,6 @@ public class HierarchicalShardSyncerTest { /** * Tests that when reading from TIP, we use the AT_LATEST shard filter. - * @throws Exception */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception { @@ -2251,7 +2338,6 @@ public class HierarchicalShardSyncerTest { /** * Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter. - * @throws Exception */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception { @@ -2261,7 +2347,6 @@ public class HierarchicalShardSyncerTest { /** * Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter. - * @throws Exception */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception { @@ -2309,7 +2394,6 @@ public class HierarchicalShardSyncerTest { /** * Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three * retries, we fast fail and throw an exception. - * @throws Exception */ @Test(expected = KinesisClientLibIOException.class) public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception { @@ -2334,7 +2418,6 @@ public class HierarchicalShardSyncerTest { /** * Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times * and a complete hash range the final time, we create the leases. - * @throws Exception */ @Test public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception { @@ -2362,7 +2445,6 @@ public class HierarchicalShardSyncerTest { /** * Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards. - * @throws Exception */ @Test public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {