From 0052b5799c2de48ecbf04b7e0ba259b3e2bae73d Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Fri, 20 Mar 2020 13:12:19 -0700 Subject: [PATCH] Multistream support for leases. --- .../amazon/kinesis/leases/HierarchicalShardSyncer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index f4a1231c..c093dced 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -839,12 +839,14 @@ public class HierarchicalShardSyncer { * regardless of if they are open or closed. Closed shards will be unblocked via child shard information upon * reaching SHARD_END. */ - private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, List shards) { + private List getLeasesToCreateForOpenAndClosedShards(InitialPositionInStreamExtended initialPosition, + List shards, MultiStreamArgs multiStreamArgs) { final Map shardIdToNewLeaseMap = new HashMap<>(); for (Shard shard : shards) { final String shardId = shard.shardId(); - final Lease lease = newKCLLease(shard); + final Lease lease = multiStreamArgs.isMultiStreamMode() ? + newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier) : newKCLLease(shard); lease.checkpoint(convertToCheckpoint(initialPosition)); log.debug("Need to create a lease for shard with shardId {}", shardId);