Adding multistreaming changes

This commit is contained in:
Joshua Kim 2020-03-23 13:27:03 -04:00
parent 30ef7f62b1
commit 892218d8b5
2 changed files with 8 additions and 4 deletions

View file

@ -826,7 +826,7 @@ public class HierarchicalShardSyncer {
.map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs)) .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards); final List<Lease> newLeasesToCreate = getLeasesToCreateForOpenAndClosedShards(initialPosition, shards, multiStreamArgs);
//TODO: Verify before LTR launch that ending sequence number is still returned from the service. //TODO: Verify before LTR launch that ending sequence number is still returned from the service.
final Comparator<Lease> startingSequenceNumberComparator = final Comparator<Lease> startingSequenceNumberComparator =

View file

@ -196,8 +196,9 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Collections.emptyList(); final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.NonEmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds); INITIAL_POSITION_LATEST, inconsistentShardIds);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1)); final Set<String> expectedLeaseShardIds = new HashSet<>(Arrays.asList(shardId0, shardId1));
@ -222,8 +223,9 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = Collections.emptyList(); final List<Lease> currentLeases = Collections.emptyList();
final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2)); final Set<String> inconsistentShardIds = new HashSet<>(Collections.singletonList(shardId2));
final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS); INITIAL_POSITION_LATEST, inconsistentShardIds, MULTI_STREAM_ARGS);
final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set<String> newLeaseKeys = newLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<String> expectedLeaseShardIds = new HashSet<>( final Set<String> expectedLeaseShardIds = new HashSet<>(
@ -1109,7 +1111,9 @@ public class HierarchicalShardSyncerTest {
ShardObjectHelper.newShard(lastShardId, null, null, ShardObjectHelper.newShard(lastShardId, null, null,
ShardObjectHelper.newSequenceNumberRange("405", null))); ShardObjectHelper.newSequenceNumberRange("405", null)));
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(shards, currentLeases, final HierarchicalShardSyncer.LeaseSynchronizer leaseSynchronizer = new HierarchicalShardSyncer.EmptyLeaseTableSynchronizer();
final List<Lease> newLeases = HierarchicalShardSyncer.determineNewLeasesToCreate(leaseSynchronizer, shards, currentLeases,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
assertThat(newLeases.size(), equalTo(1)); assertThat(newLeases.size(), equalTo(1));