diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 596ef545..cf393d92 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -21,6 +21,7 @@ package software.amazon.kinesis.leases; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,6 +56,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; @@ -1608,6 +1612,62 @@ public class HierarchicalShardSyncerTest { assertThat(newLeaseMap.isEmpty(), equalTo(true)); } + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtLatest() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_LATEST, shardFilter); + } + + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTrimHorizon() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_TRIM_HORIZON, shardFilter); + } + + @Test + public void testEmptyLeaseTableBootstrapUsesShardFilterWithAtTimestamp() throws Exception { + ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(new Date(1000L).toInstant()).build(); + testEmptyLeaseTableBootstrapUsesListShardsWithFilter(INITIAL_POSITION_AT_TIMESTAMP, shardFilter); + } + + public void testEmptyLeaseTableBootstrapUsesListShardsWithFilter(InitialPositionInStreamExtended initialPosition, ShardFilter shardFilter) throws Exception { + final String shardId0 = "shardId-0"; + final List shards = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", null))); + final List currentLeases = Collections.emptyList(); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + + verify(shardDetector, atLeast(1)).listShardsWithFilter(shardFilter); + verify(shardDetector, never()).listShards(); + } + + @Test + public void testNonEmptyLeaseTableUsesListShards() throws Exception { + final String shardId0 = "shardId-0"; + final String shardId1 = "shardId-1"; + + final List shardsWithLeases = Arrays.asList(ShardObjectHelper.newShard(shardId0, null, null, ShardObjectHelper.newSequenceNumberRange("1", "2"))); + final List shardsWithoutLeases = Arrays.asList(ShardObjectHelper.newShard(shardId1, null, null, ShardObjectHelper.newSequenceNumberRange("3", "4"))); + + final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); + + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + when(shardDetector.listShards()).thenReturn(shardsWithoutLeases); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + garbageCollectLeases, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + + verify(shardDetector, atLeast(1)).listShards(); + } + // /** // * Test CheckIfDescendantAndAddNewLeasesForAncestors - two parents, there is a lease for one parent. // */