diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index d5eb1be6..a2f9a45e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -851,8 +851,7 @@ public class Scheduler implements Runnable { if (!firstItem) { builder.append(", "); } - builder.append(shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId())); + builder.append(ShardInfo.getLeaseKey(shardInfo)); firstItem = false; } slog.info("Current stream shard assignments: " + builder.toString()); @@ -948,8 +947,7 @@ public class Scheduler implements Runnable { ShardConsumer consumer = shardInfoShardConsumerMap.get(shard); if (consumer.leaseLost()) { shardInfoShardConsumerMap.remove(shard); - log.debug("Removed consumer for {} as lease has been lost", - shard.streamIdentifierSerOpt().map(s -> s + ":" + shard.shardId()).orElse(shard.shardId())); + log.debug("Removed consumer for {} as lease has been lost", ShardInfo.getLeaseKey(shard)); } else { consumer.executeLifecycle(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java index fcb3ffde..5f1ee18c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java @@ -54,8 +54,7 @@ public class BlockOnParentShardTask implements ConsumerTask { @Override public TaskResult call() { Exception exception = null; - final String shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + final String shardInfoId = ShardInfo.getLeaseKey(shardInfo); try { boolean blockedOnParentShard = false; for (String shardId : shardInfo.parentShardIds()) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f576154a..6c52e0de 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -76,8 +76,7 @@ public class ProcessTask implements ConsumerTask { @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) { this.shardInfo = shardInfo; - this.shardInfoId = shardInfo.streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()) - .orElse(shardInfo.shardId()); + this.shardInfoId = ShardInfo.getLeaseKey(shardInfo); this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.backoffTimeMillis = backoffTimeMillis; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java index 177c0f43..21e8c2c9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java @@ -24,6 +24,7 @@ import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; @@ -70,8 +71,7 @@ class ShardConsumerSubscriber implements Subscriber { this.bufferSize = bufferSize; this.shardConsumer = shardConsumer; this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning; - this.shardInfoId = shardConsumer.shardInfo().streamIdentifierSerOpt() - .map(s -> s + ":" + shardConsumer.shardInfo().shardId()).orElse(shardConsumer.shardInfo().shardId()); + this.shardInfoId = ShardInfo.getLeaseKey(shardConsumer.shardInfo()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 9d53e75c..3449b723 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -15,15 +15,10 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; - -import java.util.List; -import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.model.ChildShard; -import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -47,8 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -89,8 +86,8 @@ public class ShutdownTask implements ConsumerTask { private final List childShards; - private static final Function leaseKeyProvider = shardInfo -> shardInfo - .streamIdentifierSerOpt().map(s -> s + ":" + shardInfo.shardId()).orElse(shardInfo.shardId()); + private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); + /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -218,26 +215,4 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private boolean isShardInContextParentOfAny(List shards) { - for(Shard shard : shards) { - if (isChildShardOfShardInContext(shard)) { - return true; - } - } - return false; - } - - private boolean isChildShardOfShardInContext(Shard shard) { - return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) - || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); - } - - private void dropLease() { - Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); - leaseCoordinator.dropLease(currentLease); - if(currentLease != null) { - log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); - } - } - }