diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 23d2e423..e81e1d52 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -276,6 +276,70 @@ public class HierarchicalShardSyncerTest { verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } + @Test + public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + + @Test + public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(new ArrayList(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);