From 407b4be8dd8099de90d1c56ddcf1b2e7930ac0fe Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Thu, 3 Oct 2019 16:29:38 -0700 Subject: [PATCH] Addressing comments Renaming Method --- .../leases/HierarchicalShardSyncer.java | 2 +- .../kinesis/lifecycle/ShutdownTask.java | 27 ++++++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 6aaaff60..78c95f4f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -88,7 +88,7 @@ public class HierarchicalShardSyncer { final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - if(CollectionUtils.isNullOrEmpty(shards)) { + if (CollectionUtils.isNullOrEmpty(shards)) { shards = getShardList(shardDetector); } log.debug("Num shards: {}", shards.size()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index a5cbdf64..cc7fddfd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -21,6 +21,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -94,10 +95,15 @@ public class ShutdownTask implements ConsumerTask { try { try { List allShards = new ArrayList<>(); - if(reason == ShutdownReason.SHARD_END) { + /* + * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows other + * shard consumer to subscribe to this shard. + */ + if (reason == ShutdownReason.SHARD_END) { allShards = shardDetector.listShards(); - if(!isRealShardEnd(allShards)) { + if (!CollectionUtils.isNullOrEmpty(allShards) && !shardEndValidated(allShards)) { reason = ShutdownReason.LEASE_LOST; } } @@ -183,17 +189,18 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private boolean isRealShardEnd(List shards) { - boolean realShardEnd = false; - + private boolean shardEndValidated(List shards) { for(Shard shard : shards) { - if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) - || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) { - realShardEnd = true; - break; + if (isChildShard(shard)) { + return true; } } - return realShardEnd; + return false; + } + + private boolean isChildShard(Shard shard) { + return (shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId()) + || shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())); } }