diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index ce658c51..7829a2de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -826,7 +826,7 @@ public class HierarchicalShardSyncer { .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); - final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards); + final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs); //TODO: Verify before LTR launch that ending sequence number is still returned from the service. final Comparator startingSequenceNumberComparator = diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 2b5d562b..b0fd5c6d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -196,8 +196,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -222,8 +223,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>( @@ -1109,7 +1111,9 @@ public class HierarchicalShardSyncerTest { ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newSequenceNumberRange("405", null))); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); assertThat(newLeases.size(), equalTo(1));