Add new metric to be emitted on lease creation
This commit is contained in:
parent
27b166c5aa
commit
b8e5e6422b
2 changed files with 32 additions and 38 deletions
|
|
@ -17,7 +17,6 @@ package software.amazon.kinesis.leases;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
@ -40,7 +39,6 @@ import lombok.NonNull;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
|
||||||
import software.amazon.awssdk.services.kinesis.model.Shard;
|
import software.amazon.awssdk.services.kinesis.model.Shard;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
|
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
|
||||||
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
|
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
|
||||||
|
|
@ -49,7 +47,6 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||||
import software.amazon.kinesis.common.StreamIdentifier;
|
import software.amazon.kinesis.common.StreamIdentifier;
|
||||||
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
|
|
||||||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||||
|
|
@ -59,7 +56,6 @@ import software.amazon.kinesis.metrics.MetricsScope;
|
||||||
import software.amazon.kinesis.metrics.MetricsUtil;
|
import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
import static java.util.Objects.nonNull;
|
|
||||||
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
|
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -76,8 +72,6 @@ public class HierarchicalShardSyncer {
|
||||||
|
|
||||||
private final String streamIdentifier;
|
private final String streamIdentifier;
|
||||||
|
|
||||||
private final DeletedStreamListProvider deletedStreamListProvider;
|
|
||||||
|
|
||||||
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
|
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
|
||||||
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
|
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
|
||||||
private static final int retriesForCompleteHashRange = 3;
|
private static final int retriesForCompleteHashRange = 3;
|
||||||
|
|
@ -85,17 +79,13 @@ public class HierarchicalShardSyncer {
|
||||||
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
|
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
|
||||||
|
|
||||||
public HierarchicalShardSyncer() {
|
public HierarchicalShardSyncer() {
|
||||||
this(false, "SingleStreamMode");
|
isMultiStreamMode = false;
|
||||||
|
streamIdentifier = "SingleStreamMode";
|
||||||
}
|
}
|
||||||
|
|
||||||
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
|
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
|
||||||
this(isMultiStreamMode, streamIdentifier, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
|
|
||||||
this.isMultiStreamMode = isMultiStreamMode;
|
this.isMultiStreamMode = isMultiStreamMode;
|
||||||
this.streamIdentifier = streamIdentifier;
|
this.streamIdentifier = streamIdentifier;
|
||||||
this.deletedStreamListProvider = deletedStreamListProvider;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
|
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
|
||||||
|
|
@ -163,16 +153,17 @@ public class HierarchicalShardSyncer {
|
||||||
final Set<Lease> createdLeases = new HashSet<>();
|
final Set<Lease> createdLeases = new HashSet<>();
|
||||||
|
|
||||||
for (Lease lease : newLeasesToCreate) {
|
for (Lease lease : newLeasesToCreate) {
|
||||||
long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
if(leaseRefresher.createLeaseIfNotExists(lease)) {
|
if(leaseRefresher.createLeaseIfNotExists(lease)) {
|
||||||
createdLeases.add(lease);
|
createdLeases.add(lease);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
}
|
} finally {
|
||||||
finally {
|
|
||||||
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
||||||
|
final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||||
|
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
|
log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases);
|
||||||
|
|
@ -289,17 +280,8 @@ public class HierarchicalShardSyncer {
|
||||||
+ retriesForCompleteHashRange + " retries.");
|
+ retriesForCompleteHashRange + " retries.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
|
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
|
||||||
// Fallback to existing behavior for backward compatibility
|
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
|
||||||
List<Shard> shardList = Collections.emptyList();
|
|
||||||
try {
|
|
||||||
shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException();
|
|
||||||
} catch (ResourceNotFoundException e) {
|
|
||||||
if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) {
|
|
||||||
deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final Optional<List<Shard>> shards = Optional.of(shardList);
|
|
||||||
|
|
||||||
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
|
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
|
||||||
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
|
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
|
||||||
|
|
@ -874,20 +856,22 @@ public class HierarchicalShardSyncer {
|
||||||
* * the parent shard has expired.
|
* * the parent shard has expired.
|
||||||
* <p>
|
* <p>
|
||||||
* For example:
|
* For example:
|
||||||
* <pre>
|
|
||||||
* Shard structure (each level depicts a stream segment):
|
* Shard structure (each level depicts a stream segment):
|
||||||
* 0 1 2 3 4 5 - shards till epoch 102
|
* 0 1 2 3 4 5 - shards till epoch 102
|
||||||
* \ / \ / | |
|
* \ / \ / | |
|
||||||
* 6 7 4 5 - shards from epoch 103 - 205
|
* 6 7 4 5 - shards from epoch 103 - 205
|
||||||
* \ / | / \
|
* \ / | / \
|
||||||
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
|
||||||
* </pre>
|
*
|
||||||
* Assuming current leases are (4, 5, 7), new leases to create for an initial position are:
|
* Current leases: (4, 5, 7)
|
||||||
* <ul>
|
*
|
||||||
* <li>LATEST: (6)</li>
|
* If initial position is LATEST:
|
||||||
* <li>TRIM_HORIZON: (0, 1)</li>
|
* - New leases to create: (6)
|
||||||
* <li>AT_TIMESTAMP(epoch=200): (0, 1)</li>
|
* If initial position is TRIM_HORIZON:
|
||||||
* </ul>
|
* - New leases to create: (0, 1)
|
||||||
|
* If initial position is AT_TIMESTAMP(epoch=200):
|
||||||
|
* - New leases to create: (0, 1)
|
||||||
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* The leases returned are sorted by the starting sequence number - following the same order
|
* The leases returned are sorted by the starting sequence number - following the same order
|
||||||
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
|
||||||
|
|
|
||||||
|
|
@ -181,7 +181,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
|
||||||
}
|
}
|
||||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||||
createLeasesForChildShardsIfNotExist();
|
createLeasesForChildShardsIfNotExist(scope);
|
||||||
updateLeaseWithChildShards(currentShardLease);
|
updateLeaseWithChildShards(currentShardLease);
|
||||||
}
|
}
|
||||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
|
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
|
||||||
|
|
@ -239,7 +239,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createLeasesForChildShardsIfNotExist()
|
private void createLeasesForChildShardsIfNotExist(MetricsScope scope)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
|
||||||
// not present in the lease table before creating the lease entry.
|
// not present in the lease table before creating the lease entry.
|
||||||
|
|
@ -272,8 +272,18 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
|
||||||
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
|
||||||
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
|
||||||
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
final long startTime = System.currentTimeMillis();
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
|
||||||
|
if (leaseToCreate.checkpoint() != null) {
|
||||||
|
final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER";
|
||||||
|
MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED);
|
||||||
|
}
|
||||||
|
}
|
||||||
log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate);
|
log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue