From 892218d8b5a92c9727032821e2f85e262cae96d7 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Mon, 23 Mar 2020 13:27:03 -0400 Subject: [PATCH] Adding multistreaming changes --- .../amazon/kinesis/leases/HierarchicalShardSyncer.java | 2 +- .../kinesis/leases/HierarchicalShardSyncerTest.java | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index ce658c51..7829a2de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -826,7 +826,7 @@ public class HierarchicalShardSyncer { .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .collect(Collectors.toSet()); - final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards); + final List newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs); //TODO: Verify before LTR launch that ending sequence number is still returned from the service. final Comparator startingSequenceNumberComparator = diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 2b5d562b..b0fd5c6d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -196,8 +196,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); @@ -222,8 +223,9 @@ public class HierarchicalShardSyncerTest { final List currentLeases = Collections.emptyList(); final Set inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); final Set newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set expectedLeaseShardIds = new HashSet<>( @@ -1109,7 +1111,9 @@ public class HierarchicalShardSyncerTest { ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newSequenceNumberRange("405", null))); - final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, + final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer(); + + final List newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases, INITIAL_POSITION_LATEST); assertThat(newLeases.size(), equalTo(1));