diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index f4a1231c..c093dced 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -839,12 +839,14 @@ public class HierarchicalShardSyncer { * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon * reaching SHARD_END. */ - private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, List shards) { + private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, + List shards, MultiStreamArgs multiStreamArgs) { final Map shardIdToNewLeaseMap = new HashMap<>(); for (Shard shard : shards) { final String shardId = shard.shardId(); - final Lease lease = newKCLLease(shard); + final Lease lease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard); lease.checkpoint(convertToCheckpoint(initialPosition)); log.debug("Need to create a lease for shard with shardId {}", shardId);