From 760c52d7bcaa1131cc1f7d0428e3cdaf26720524 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 5 May 2020 17:31:08 -0700 Subject: [PATCH] getLeaseKey --- .../java/software/amazon/kinesis/leases/ShardInfo.java | 8 ++++++-- .../amazon/kinesis/lifecycle/BlockOnParentShardTask.java | 2 +- .../software/amazon/kinesis/lifecycle/ShutdownTask.java | 4 +--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index c4b2968c..e0da7dec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -149,9 +149,13 @@ public class ShardInfo { * @return lease key */ public static String getLeaseKey(ShardInfo shardInfo) { + return getLeaseKey(shardInfo, shardInfo.shardId()); + } + + public static String getLeaseKey(ShardInfo shardInfo, String shardId) { return shardInfo.streamIdentifierSerOpt().isPresent() ? - MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardInfo.shardId()) : - shardInfo.shardId(); + MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardId) : + shardId; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 0894be69..8797085a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -61,7 +61,7 @@ public class BlockOnParentShardTask implements ConsumerTask { try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { - final String leaseKey = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardId).orElse(shardId); + final String leaseKey = ShardInfo.getLeaseKey(shardInfo, shardId); final Lease lease = leaseRefresher.getLease(leaseKey); if (lease != null) { ExtendedSequenceNumber checkpoint = lease.checkpoint(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 2f05ece0..9d53e75c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -183,9 +183,7 @@ public class ShutdownTask implements ConsumerTask { private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException { for(ChildShard childShard : childShards) { - final String leaseKey = shardInfo.streamIdentifierSerOpt() - .map(s -> s + ":" + childShard.shardId()) - .orElse(childShard.shardId()); + final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) { final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);