Documentation: added <pre> tags so fixed-format diagrams aren't garbled. (#1058)

No functional change.
This commit is contained in:
stair 2023-03-07 15:36:25 -05:00 committed by GitHub
parent 87dc586e6d
commit 43d43653d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 141 additions and 61 deletions

View file

@ -855,22 +855,20 @@ public class HierarchicalShardSyncer {
* * the parent shard has expired.
* <p>
* For example:
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: (4, 5, 7)
*
* If initial position is LATEST:
* - New leases to create: (6)
* If initial position is TRIM_HORIZON:
* - New leases to create: (0, 1)
* If initial position is AT_TIMESTAMP(epoch=200):
* - New leases to create: (0, 1)
*
* </pre>
* Assuming current leases are (4, 5, 7), new leases to create for an initial position are:
* <ul>
* <li>LATEST: (6)</li>
* <li>TRIM_HORIZON: (0, 1)</li>
* <li>AT_TIMESTAMP(epoch=200): (0, 1)</li>
* </ul>
* <p>
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail

View file

@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -73,7 +72,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.leases.HierarchicalShardSyncer.MemoizationContext;
@RunWith(MockitoJUnitRunner.class)
// CHECKSTYLE:IGNORE JavaNCSS FOR NEXT 800 LINES
public class HierarchicalShardSyncerTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_LATEST = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.LATEST);
@ -320,7 +318,6 @@ public class HierarchicalShardSyncerTest {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test
@ -355,7 +352,6 @@ public class HierarchicalShardSyncerTest {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
private List<String> toMultiStreamLeaseList(List<String> shardIdBasedLeases) {
@ -460,7 +456,6 @@ public class HierarchicalShardSyncerTest {
final Set<String> expectedShardIds = new HashSet<>();
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
validateHashRangeinLease(requestLeases);
@ -472,7 +467,8 @@ public class HierarchicalShardSyncerTest {
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -481,6 +477,7 @@ public class HierarchicalShardSyncerTest {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Initial position: TRIM_HORIZON
* Leases to create: (0, 1, 2, 3, 4, 5)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithEmptyLeaseTable() throws Exception {
@ -490,7 +487,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -499,6 +497,7 @@ public class HierarchicalShardSyncerTest {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Initial position: AT_TIMESTAMP(1000)
* Leases to create: (8, 4, 9, 10)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable1() throws Exception {
@ -508,7 +507,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -517,6 +517,7 @@ public class HierarchicalShardSyncerTest {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Initial position: AT_TIMESTAMP(200)
* Leases to create: (6, 7, 4, 5)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTimestampWithEmptyLeaseTable2() throws Exception {
@ -528,7 +529,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -537,6 +539,7 @@ public class HierarchicalShardSyncerTest {
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Initial position: LATEST
* Leases to create: (8, 4, 9, 10)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtLatestWithEmptyLeaseTable() throws Exception {
@ -546,7 +549,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_LATEST, expectedLeaseKeysToCreate);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -556,6 +560,7 @@ public class HierarchicalShardSyncerTest {
* Missing leases: (0, 6, 8)
* Initial position: TRIM_HORIZON
* Leases to create: (0)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonWithPartialLeaseTable() throws Exception {
@ -571,7 +576,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_TRIM_HORIZON, expectedLeaseKeysToCreate, existingLeases);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -581,6 +587,7 @@ public class HierarchicalShardSyncerTest {
* Missing leases: (0, 6, 8)
* Initial position: AT_TIMESTAMP(1000)
* Leases to create: (0)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable1() throws Exception {
@ -596,7 +603,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, INITIAL_POSITION_AT_TIMESTAMP, expectedLeaseKeysToCreate, existingLeases);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -606,6 +614,7 @@ public class HierarchicalShardSyncerTest {
* Missing leases: (0, 6, 8)
* Initial position: AT_TIMESTAMP(200)
* Leases to create: (0)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtTimestampWithPartialLeaseTable2() throws Exception {
@ -623,7 +632,8 @@ public class HierarchicalShardSyncerTest {
testCheckAndCreateLeaseForShardsIfMissing(shards, initialPosition, expectedLeaseKeysToCreate, existingLeases);
}
/*
/**
* <pre>
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
@ -633,6 +643,7 @@ public class HierarchicalShardSyncerTest {
* Missing leases: (0, 6, 8)
* Initial position: LATEST
* Leases to create: (0)
* </pre>
*/
@Test
public void testCheckAndCreateLeasesForNewShardsAtLatestWithPartialLeaseTable() throws Exception {
@ -835,9 +846,6 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(deleteLeases.size(), equalTo(0));
@ -1074,7 +1082,6 @@ public class HierarchicalShardSyncerTest {
final InitialPositionInStreamExtended initialPosition,
final Set<String> expectedLeaseKeys,
final List<Lease> existingLeases) throws Exception {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
@ -1168,6 +1175,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1176,6 +1184,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (3, 4, 5)
* Initial position: LATEST
* Expected leases: (2, 6)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange1() {
@ -1190,6 +1199,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1198,7 +1208,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 7)
* Initial position: LATEST
* Expected leases: (6)
*
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange2() {
@ -1212,6 +1222,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1220,6 +1231,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (2, 6)
* Initial position: LATEST
* Expected leases: (3, 4, 9, 10)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange3() {
@ -1236,6 +1248,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1244,6 +1257,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 9, 10)
* Initial position: LATEST
* Expected leases: (8)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_PartialHashRange4() {
@ -1256,6 +1270,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* <pre>
* Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is
* epochs): 0 1 2 3 - shards till
* / \ | \ /
@ -1266,6 +1281,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (9, 10)
* Initial position: LATEST
* Expected leases: (1, 6, 7, 8)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestC_PartialHashRange5() {
@ -1282,6 +1298,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1290,6 +1307,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 6, 7)
* Initial position: LATEST
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRange() {
@ -1302,6 +1320,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1310,6 +1329,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
* Initial position: LATEST
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeWithoutGC() {
@ -1323,6 +1343,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1331,6 +1352,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: LATEST
* Expected leases: (4, 8, 9, 10)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_EmptyLeaseTable() {
@ -1347,6 +1369,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1355,6 +1378,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 4, 7, 9, 10)
* Initial position: LATEST
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestA_CompleteHashRangeAcrossDifferentEpochs() {
@ -1365,7 +1389,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1376,17 +1401,19 @@ public class HierarchicalShardSyncerTest {
* Current leases: (6)
* Initial position: LATEST
* Expected leases: (7)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestB_PartialHashRange() {
final List<Shard> shards = constructShardListForGraphB();
final List<String> shardIdsOfCurrentLeases = Arrays.asList("shardId-6");
final List<String> shardIdsOfCurrentLeases = Collections.singletonList("shardId-6");
final Map<String, ExtendedSequenceNumber> expectedShardIdCheckpointMap = new HashMap<>();
expectedShardIdCheckpointMap.put("shardId-7", ExtendedSequenceNumber.LATEST);
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedShardIdCheckpointMap);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1397,16 +1424,18 @@ public class HierarchicalShardSyncerTest {
* Current leases: (5)
* Initial position: LATEST
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRange() {
final List<Shard> shards = constructShardListForGraphB();
final List<String> shardIdsOfCurrentLeases = Arrays.asList("shardId-5");
final List<String> shardIdsOfCurrentLeases = Collections.singletonList("shardId-5");
final Map<String, ExtendedSequenceNumber> expectedNoNewLeases = Collections.emptyMap();
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1417,6 +1446,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5)
* Initial position: LATEST
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestB_CompleteHashRangeWithoutGC() {
@ -1427,7 +1457,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_LATEST, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1438,6 +1469,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: LATEST
* Expected leases: (9, 10)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeLatestB_EmptyLeaseTable() {
@ -1452,6 +1484,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1460,6 +1493,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (3, 4, 5)
* Initial position: TRIM_HORIZON
* Expected leases: (0, 1, 2)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange1() {
@ -1475,6 +1509,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1483,6 +1518,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 7)
* Initial position: TRIM_HORIZON
* Expected leases: (0, 1)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange2() {
@ -1497,6 +1533,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1505,6 +1542,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (2, 6)
* Initial position: TRIM_HORIZON
* Expected leases: (3, 4, 5)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange3() {
@ -1520,6 +1558,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1528,6 +1567,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 9, 10)
* Initial position: TRIM_HORIZON
* Expected leases: (0, 1, 2, 3)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_PartialHashRange4() {
@ -1544,6 +1584,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1552,6 +1593,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 6, 7)
* Initial position: TRIM_HORIZON
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRange() {
@ -1564,6 +1606,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1572,6 +1615,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
* Initial position: TRIM_HORIZON
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeWithoutGC() {
@ -1585,6 +1629,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1593,6 +1638,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: TRIM_HORIZON
* Expected leases: (0, 1, 2, 3, 4, 5)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_EmptyLeaseTable() {
@ -1611,6 +1657,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1619,6 +1666,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 4, 7, 9, 10)
* Initial position: TRIM_HORIZON
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonA_CompleteHashRangeAcrossDifferentEpochs() {
@ -1629,7 +1677,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1640,6 +1689,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (6)
* Initial position: TRIM_HORIZON
* Expected leases: (7)
* </pre>
*/
// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases
// @Test
@ -1651,7 +1701,8 @@ public class HierarchicalShardSyncerTest {
// assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedShardIdCheckpointMap);
// }
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1662,16 +1713,18 @@ public class HierarchicalShardSyncerTest {
* Current leases: (5)
* Initial position: TRIM_HORIZON
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRange() {
final List<Shard> shards = constructShardListForGraphB();
final List<String> shardIdsOfCurrentLeases = Arrays.asList("shardId-5");
final List<String> shardIdsOfCurrentLeases = Collections.singletonList("shardId-5");
final Map<String, ExtendedSequenceNumber> expectedNoNewLeases = Collections.emptyMap();
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1682,6 +1735,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5)
* Initial position: TRIM_HORIZON
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonB_CompleteHashRangeWithoutGC() {
@ -1692,7 +1746,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_TRIM_HORIZON, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1703,6 +1758,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: TRIM_HORIZON
* Expected leases: (0, 1)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeHorizonB_EmptyLeaseTable() {
@ -1717,6 +1773,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1725,6 +1782,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (3, 4, 5)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (0, 1, 2)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange1() {
@ -1740,6 +1798,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1748,6 +1807,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 7)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (0, 1)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange2() {
@ -1762,6 +1822,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1770,6 +1831,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (2, 6)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (3, 4, 5)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange3() {
@ -1785,6 +1847,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1793,6 +1856,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 9, 10)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (0, 1, 2, 3)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_PartialHashRange4() {
@ -1809,6 +1873,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1817,6 +1882,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (4, 5, 6, 7)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRange() {
@ -1829,6 +1895,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1837,6 +1904,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5, 6, 7)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeWithoutGC() {
@ -1850,6 +1918,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1858,6 +1927,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (0, 1, 2, 3, 4, 5)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_EmptyLeaseTable() {
@ -1876,6 +1946,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test CheckIfDescendantAndAddNewLeasesForAncestors
* Shard structure (each level depicts a stream segment):
* <pre>
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
@ -1884,6 +1955,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 4, 7, 9, 10)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampA_CompleteHashRangeAcrossDifferentEpochs() {
@ -1894,7 +1966,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1905,6 +1978,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (6)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (7)
* </pre>
*/
// TODO: Account for out-of-order lease creation in TRIM_HORIZON and AT_TIMESTAMP cases
// @Test
@ -1916,7 +1990,8 @@ public class HierarchicalShardSyncerTest {
// assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedShardIdCheckpointMap);
// }
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1927,16 +2002,18 @@ public class HierarchicalShardSyncerTest {
* Current leases: (5)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRange() {
final List<Shard> shards = constructShardListForGraphB();
final List<String> shardIdsOfCurrentLeases = Arrays.asList("shardId-5");
final List<String> shardIdsOfCurrentLeases = Collections.singletonList("shardId-5");
final Map<String, ExtendedSequenceNumber> expectedNoNewLeases = Collections.emptyMap();
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1947,6 +2024,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: (0, 1, 2, 3, 4, 5)
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: empty set
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_CompleteHashRangeWithoutGC() {
@ -1957,7 +2035,8 @@ public class HierarchicalShardSyncerTest {
assertExpectedLeasesAreCreated(shards, shardIdsOfCurrentLeases, INITIAL_POSITION_AT_TIMESTAMP, expectedNoNewLeases);
}
/*
/**
* <pre>
* Shard structure (x-axis is epochs):
* 0 3 6 9
* \ / \ / \ /
@ -1968,6 +2047,7 @@ public class HierarchicalShardSyncerTest {
* Current leases: empty set
* Initial position: AT_TIMESTAMP(1000)
* Expected leases: (0, 1)
* </pre>
*/
@Test
public void testDetermineNewLeasesToCreateSplitMergeAtTimestampB_EmptyLeaseTable() {
@ -2005,7 +2085,8 @@ public class HierarchicalShardSyncerTest {
}
}
/*
/**
* <pre>
* Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
* epochs): 0 1 2 3 4 5- shards till
* \ / \ / | |
@ -2013,6 +2094,7 @@ public class HierarchicalShardSyncerTest {
* \ / | /\
* 8 4 9 10 -
* shards from epoch 206 (open - no ending sequenceNumber)
* </pre>
*/
private List<Shard> constructShardListForGraphA() {
final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
@ -2075,7 +2157,8 @@ public class HierarchicalShardSyncerTest {
throw new RuntimeException("Unsupported initial position " + initialPosition);
}
/*
/**
* <pre>
* Helper method to get expected shards for Graph A based on initial position in stream. Shard structure (y-axis is
* epochs): 0 1 2 3 4 5- shards till
* \ / \ / | |
@ -2083,6 +2166,7 @@ public class HierarchicalShardSyncerTest {
* \ / | /\
* 8 4 9 10 -
* shards from epoch 206 (open - no ending sequenceNumber)
* </pre>
*/
private Set<Lease> getExpectedLeasesForGraphA(List<Shard> shards,
ExtendedSequenceNumber sequenceNumber,
@ -2097,15 +2181,17 @@ public class HierarchicalShardSyncerTest {
return new HashSet<>(createLeasesFromShards(filteredShards, sequenceNumber, null));
}
// /*
// * Helper method to construct a shard list for graph B. Graph B is defined below.
// * Shard structure (x-axis is epochs):
// * 0 3 6 9
// * \ / \ / \ /
// * 2 5 8
// * / \ / \ / \
// * 1 4 7 10
// */
/**
* Helper method to construct a shard list for graph B. Graph B is defined below.
* Shard structure (x-axis is epochs):
* <pre>
* 0 3 6 9
* \ / \ / \ /
* 2 5 8
* / \ / \ / \
* 1 4 7 10
* </pre>
*/
private List<Shard> constructShardListForGraphB() {
final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("1000", "1049");
final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("1050", "1099");
@ -2133,6 +2219,7 @@ public class HierarchicalShardSyncerTest {
}
/**
* <pre>
* Helper method to construct a shard list for graph C. Graph C is defined below. Shard structure (y-axis is
* epochs): 0 1 2 3 - shards till
* / \ | \ /
@ -2140,6 +2227,7 @@ public class HierarchicalShardSyncerTest {
* / \ / \ | |
* 7 8 9 10 1 6
* shards from epoch 206 (open - no ending sequenceNumber)
* </pre>
*/
private List<Shard> constructShardListForGraphC() {
final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
@ -2241,7 +2329,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tests that when reading from TIP, we use the AT_LATEST shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception {
@ -2251,7 +2338,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception {
@ -2261,7 +2347,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter.
* @throws Exception
*/
@Test
public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception {
@ -2309,7 +2394,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tries to boostrap empty lease table. Verifies that if we fail to get a complete hash range of shards after three
* retries, we fast fail and throw an exception.
* @throws Exception
*/
@Test(expected = KinesisClientLibIOException.class)
public void testEmptyLeaseTableThrowsExceptionWhenHashRangeIsStillIncompleteAfterRetries() throws Exception {
@ -2334,7 +2418,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tries to bootstrap an empty lease table. Verifies that after getting an incomplete hash range of shards two times
* and a complete hash range the final time, we create the leases.
* @throws Exception
*/
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRangeAfterTwoRetries() throws Exception {
@ -2362,7 +2445,6 @@ public class HierarchicalShardSyncerTest {
/**
* Tries to bootstrap an empty lease table. Verifies that leases are created when we have a complete hash range of shards.
* @throws Exception
*/
@Test
public void testEmptyLeaseTablePopulatesLeasesWithCompleteHashRange() throws Exception {