Multistream support for leases.

This commit is contained in:
Joshua Kim 2020-03-20 13:12:19 -07:00
parent 1ccbe614eb
commit 0052b5799c

View file

@ -839,12 +839,14 @@ public class HierarchicalShardSyncer {
* regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon
* reaching SHARD_END. * reaching SHARD_END.
*/ */
private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, List<Shard> shards) { private List<Lease> getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition,
List<Shard> shards, MultiStreamArgs multiStreamArgs) {
final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>(); final Map<String, Lease> shardIdToNewLeaseMap = new HashMap<>();
for (Shard shard : shards) { for (Shard shard : shards) {
final String shardId = shard.shardId(); final String shardId = shard.shardId();
final Lease lease = newKCLLease(shard); final Lease lease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard);
lease.checkpoint(convertToCheckpoint(initialPosition)); lease.checkpoint(convertToCheckpoint(initialPosition));
log.debug("Need to create a lease for shard with shardId {}", shardId); log.debug("Need to create a lease for shard with shardId {}", shardId);