From 1036006bb4c06b10355e9786bca7b055cafd1661 Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 5 May 2020 15:44:45 -0700 Subject: [PATCH] leaseKey parameter fix and logging change for shutdownTask --- .../amazon/kinesis/lifecycle/BlockOnParentShardTask.java | 5 ++++- .../software/amazon/kinesis/lifecycle/ShutdownTask.java | 7 ++++++- .../amazon/kinesis/lifecycle/ShutdownTaskTest.java | 5 +++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index 37a092e8..0894be69 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -25,6 +25,8 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.function.Function; + /** * Task to block until processing of all data records in the parent shard(s) is completed. * We check if we have checkpoint(s) for the parent shard(s). @@ -59,7 +61,8 @@ public class BlockOnParentShardTask implements ConsumerTask { try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { - Lease lease = leaseRefresher.getLease(shardId); + final String leaseKey = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardId).orElse(shardId); + final Lease lease = leaseRefresher.getLease(leaseKey); if (lease != null) { ExtendedSequenceNumber checkpoint = lease.checkpoint(); if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9111b946..2f05ece0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -183,9 +183,13 @@ public class ShutdownTask implements ConsumerTask { private void createLeasesForChildShardsIfNotExist() throws DependencyException, InvalidStateException, ProvisionedThroughputException { for(ChildShard childShard : childShards) { - if(leaseCoordinator.getCurrentlyHeldLease(childShard.shardId()) == null) { + final String leaseKey = shardInfo.streamIdentifierSerOpt() + .map(s -> s + ":" + childShard.shardId()) + .orElse(childShard.shardId()); + if(leaseCoordinator.getCurrentlyHeldLease(leaseKey) == null) { final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); + log.info("Shard {}: Created child shard lease: {}", shardInfo.shardId(), leaseToCreate.leaseKey()); } } } @@ -198,6 +202,7 @@ public class ShutdownTask implements ConsumerTask { final Lease updatedLease = currentLease.copy(); updatedLease.childShardIds(childShardIds); leaseCoordinator.updateLease(updatedLease, UUID.fromString(shardInfo.concurrencyToken()), SHUTDOWN_TASK_OPERATION, leaseKeyProvider.apply(shardInfo)); + log.info("Shard {}: Updated current lease {} with child shard information: {}", shardInfo.shardId(), currentLease.leaseKey(), childShardIds); } /* diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index c94a3266..7992d604 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -46,6 +46,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; @@ -107,6 +108,10 @@ public class ShutdownTaskTest { public void setUp() throws Exception { doNothing().when(recordsPublisher).shutdown(); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); + final Lease childLease = new Lease(); + childLease.leaseKey("childShardLeaseKey"); + when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) + .thenReturn(childLease); shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST);