diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 7829a2de..d384bfe5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -313,6 +313,11 @@ public class HierarchicalShardSyncer { return shardIdToChildShardIdsMap; } + /** + * Helper method to resolve the correct shard filter to use when listing shards from a position in a stream. + * @param initialPositionInStreamExtended + * @return + */ private static ShardFilter getShardFilterFromInitialPosition(InitialPositionInStreamExtended initialPositionInStreamExtended) { ShardFilter.Builder builder = ShardFilter.builder(); @@ -806,17 +811,44 @@ public class HierarchicalShardSyncer { private final StreamIdentifier streamIdentifier; } + /** + * Interface to determine how to create new leases. + */ @VisibleForTesting static interface LeaseSynchronizer { + /** + * Determines how to create leases. + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @param multiStreamArgs + * @return + */ List determineNewLeasesToCreate(List shards, List currentLeases, InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs); } + /** + * Class to help create leases when the table is initially empty. + */ @Slf4j @AllArgsConstructor static class EmptyLeaseTableSynchronizer implements LeaseSynchronizer { + /** + * Determines how to create leases when the lease table is initially empty. For this, we read all shards where + * the KCL is reading from. For any shards which are closed, we will discover their child shards through GetRecords + * child shard information. + * + * @param shards + * @param currentLeases + * @param initialPosition + * @param inconsistentShardIds + * @param multiStreamArgs + * @return + */ @Override public List determineNewLeasesToCreate(List shards, List currentLeases, InitialPositionInStreamExtended initialPosition, Set inconsistentShardIds, MultiStreamArgs multiStreamArgs) { @@ -860,6 +892,9 @@ public class HierarchicalShardSyncer { } + /** + * Class to help create leases when the lease table is not initially empty. + */ @Slf4j @AllArgsConstructor static class NonEmptyLeaseTableSynchronizer implements LeaseSynchronizer { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index cf393d92..d8684cd7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1612,18 +1612,30 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseMap.isEmpty(), equalTo(true)); } + /** + * Tests that when reading from TIP, we use the AT_LATEST shard filter. + * @throws Exception + */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception { ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter); } + /** + * Tests that when reading from TRIM, we use the TRIM_HORIZON shard filter. + * @throws Exception + */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception { ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build(); testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter); } + /** + * Tests that when reading from AT_TIMESTAMP, we use the AT_TIMESTAMP shard filter. + * @throws Exception + */ @Test public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception { ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build(); @@ -1668,7 +1680,7 @@ public class HierarchicalShardSyncerTest { verify(shardDetector, atLeast(1)).listShards(); } -// /** +// /**getShardFilterFromInitialPosition // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // */ // @Test