From b8e5e6422b1f4725cf279d7349c451433e17d900 Mon Sep 17 00:00:00 2001 From: Noah Thomas Date: Thu, 9 Mar 2023 17:07:29 -0800 Subject: [PATCH] Add new metric to be emitted on lease creation --- .../leases/HierarchicalShardSyncer.java | 52 +++++++------------ .../kinesis/lifecycle/ShutdownTask.java | 18 +++++-- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index aafbfcff..9e66f298 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.leases; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -40,7 +39,6 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; -import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; @@ -49,7 +47,6 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; -import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -59,7 +56,6 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import static java.util.Objects.nonNull; import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; /** @@ -76,8 +72,6 @@ public class HierarchicalShardSyncer { private final String streamIdentifier; - private final DeletedStreamListProvider deletedStreamListProvider; - private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final int retriesForCompleteHashRange = 3; @@ -85,17 +79,13 @@ public class HierarchicalShardSyncer { private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; public HierarchicalShardSyncer() { - this(false, "SingleStreamMode"); + isMultiStreamMode = false; + streamIdentifier = "SingleStreamMode"; } public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) { - this(isMultiStreamMode, streamIdentifier, null); - } - - public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) { this.isMultiStreamMode = isMultiStreamMode; this.streamIdentifier = streamIdentifier; - this.deletedStreamListProvider = deletedStreamListProvider; } private static final BiFunction shardIdFromLeaseDeducer = @@ -163,16 +153,17 @@ public class HierarchicalShardSyncer { final Set createdLeases = new HashSet<>(); for (Lease lease : newLeasesToCreate) { - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); boolean success = false; try { if(leaseRefresher.createLeaseIfNotExists(lease)) { createdLeases.add(lease); } success = true; - } - finally { + } finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); + final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); } } log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases); @@ -289,17 +280,8 @@ public class HierarchicalShardSyncer { + retriesForCompleteHashRange + " retries."); } - private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - // Fallback to existing behavior for backward compatibility - List shardList = Collections.emptyList(); - try { - shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException(); - } catch (ResourceNotFoundException e) { - if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) { - deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier)); - } - } - final Optional> shards = Optional.of(shardList); + private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + final Optional> shards = Optional.of(shardDetector.listShards()); return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); @@ -874,20 +856,22 @@ public class HierarchicalShardSyncer { * * the parent shard has expired. *

* For example: - *

          * Shard structure (each level depicts a stream segment):
          * 0 1 2 3 4   5   - shards till epoch 102
          * \ / \ / |   |
          *  6   7  4   5   - shards from epoch 103 - 205
          *  \  /   |  / \
          *   8     4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
-         * 
- * Assuming current leases are (4, 5, 7), new leases to create for an initial position are: - *
    - *
  • LATEST: (6)
  • - *
  • TRIM_HORIZON: (0, 1)
  • - *
  • AT_TIMESTAMP(epoch=200): (0, 1)
  • - *
+ * + * Current leases: (4, 5, 7) + * + * If initial position is LATEST: + * - New leases to create: (6) + * If initial position is TRIM_HORIZON: + * - New leases to create: (0, 1) + * If initial position is AT_TIMESTAMP(epoch=200): + * - New leases to create: (0, 1) + * *

* The leases returned are sorted by the starting sequence number - following the same order * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index c436f38a..39f13236 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -181,7 +181,7 @@ public class ShutdownTask implements ConsumerTask { + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); } if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); + createLeasesForChildShardsIfNotExist(scope); updateLeaseWithChildShards(currentShardLease); } final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, @@ -239,7 +239,7 @@ public class ShutdownTask implements ConsumerTask { } } - private void createLeasesForChildShardsIfNotExist() + private void createLeasesForChildShardsIfNotExist(MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException { // For child shard resulted from merge of two parent shards, verify if both the parents are either present or // not present in the lease table before creating the lease entry. @@ -272,8 +272,18 @@ public class ShutdownTask implements ConsumerTask { if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) { log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); - leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); - + final long startTime = System.currentTimeMillis(); + boolean success = false; + try { + leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); + success = true; + } finally { + MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); + if (leaseToCreate.checkpoint() != null) { + final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); + } + } log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate); } }