From 1e098e9dc6cf54c2675a0056fe67bcd32cf8dfb9 Mon Sep 17 00:00:00 2001 From: Noah Thomas Date: Fri, 10 Mar 2023 15:06:33 -0800 Subject: [PATCH] Rebase changes from master --- .../leases/HierarchicalShardSyncer.java | 51 +++++++++++++------ .../kinesis/lifecycle/ShutdownTask.java | 1 + 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 9e66f298..82514b87 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.leases; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; @@ -47,6 +49,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -56,6 +59,7 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static java.util.Objects.nonNull; import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; /** @@ -72,6 +76,8 @@ public class HierarchicalShardSyncer { private final String streamIdentifier; + private final DeletedStreamListProvider deletedStreamListProvider; + private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final int retriesForCompleteHashRange = 3; @@ -79,13 +85,17 @@ public class HierarchicalShardSyncer { private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; public HierarchicalShardSyncer() { - isMultiStreamMode = false; - streamIdentifier = "SingleStreamMode"; + this(false, "SingleStreamMode"); } public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) { + this(isMultiStreamMode, streamIdentifier, null); + } + + public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) { this.isMultiStreamMode = isMultiStreamMode; this.streamIdentifier = streamIdentifier; + this.deletedStreamListProvider = deletedStreamListProvider; } private static final BiFunction shardIdFromLeaseDeducer = @@ -162,8 +172,10 @@ public class HierarchicalShardSyncer { success = true; } finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); - final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; - MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); + if (lease.checkpoint() != null) { + final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); + } } } log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases); @@ -280,8 +292,17 @@ public class HierarchicalShardSyncer { + retriesForCompleteHashRange + " retries."); } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final Optional> shards = Optional.of(shardDetector.listShards()); + private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + // Fallback to existing behavior for backward compatibility + List shardList = Collections.emptyList(); + try { + shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException(); + } catch (ResourceNotFoundException e) { + if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) { + deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier)); + } + } + final Optional> shards = Optional.of(shardList); return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); @@ -856,22 +877,20 @@ public class HierarchicalShardSyncer { * * the parent shard has expired. *

* For example: + *

          * Shard structure (each level depicts a stream segment):
          * 0 1 2 3 4   5   - shards till epoch 102
          * \ / \ / |   |
          *  6   7  4   5   - shards from epoch 103 - 205
          *  \  /   |  / \
          *   8     4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
-         *
-         * Current leases: (4, 5, 7)
-         *
-         * If initial position is LATEST:
-         *   - New leases to create: (6)
-         * If initial position is TRIM_HORIZON:
-         *   - New leases to create: (0, 1)
-         * If initial position is AT_TIMESTAMP(epoch=200):
-         *   - New leases to create: (0, 1)
-         *
+         * 
+ * Assuming current leases are (4, 5, 7), new leases to create for an initial position are: + *
    + *
  • LATEST: (6)
  • + *
  • TRIM_HORIZON: (0, 1)
  • + *
  • AT_TIMESTAMP(epoch=200): (0, 1)
  • + *
*

* The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 39f13236..0322c0e2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -284,6 +284,7 @@ public class ShutdownTask implements ConsumerTask { MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); } } + log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate); } }