> shardIdToChildShardIdsMap;
+
+ /**
+ * Determine new leases to create and their initial checkpoint.
+ * Note: Package level access only for testing purposes.
+ *
+ * For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
+ * determine if it is a descendant of any shard which is or will be processed (e.g. for which a lease exists):
+ * If so, create a lease for the first ancestor that needs to be processed (if needed). We will create leases
+ * for no more than one level in the ancestry tree. Once we find the first ancestor that needs to be processed,
+ * we will avoid creating leases for further descendants of that ancestor.
+ * If not, set checkpoint of the shard to the initial position specified by the client.
+ * To check if we need to create leases for ancestors, we use the following rules:
+ * * If we began (or will begin) processing data for a shard, then we must reach end of that shard before
+ * we begin processing data from any of its descendants.
+ * * A shard does not start processing data until data from all its parents has been processed.
+ * Note, if the initial position is LATEST and a shard has two parents and only one is a descendant - we'll create
+ * leases corresponding to both the parents - the parent shard which is not a descendant will have
+ * its checkpoint set to Latest.
+ *
+ * We assume that if there is an existing lease for a shard, then either:
+ * * we have previously created a lease for its parent (if it was needed), or
+ * * the parent shard has expired.
+ *
+ * For example:
+ * Shard structure (each level depicts a stream segment):
+ * 0 1 2 3 4 5 - shards till epoch 102
+ * \ / \ / | |
+ * 6 7 4 5 - shards from epoch 103 - 205
+ * \ / | / \
+ * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
+ *
+ * Current leases: (4, 5, 7)
+ *
+ * If initial position is LATEST:
+ * - New leases to create: (6)
+ * If initial position is TRIM_HORIZON:
+ * - New leases to create: (0, 1)
+ * If initial position is AT_TIMESTAMP(epoch=200):
+ * - New leases to create: (0, 1)
+ *
+ *
+ * The leases returned are sorted by the starting sequence number - following the same order
+ * when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
+ * before creating all the leases.
+ *
+ * If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
+ * here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
+ * high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
+ * currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
+ *
+ * @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
+ */
+ @Override
+ public synchronized List determineNewLeasesToCreate(final List shards, final List currentLeases,
+ final InitialPositionInStreamExtended initialPosition, final Set inconsistentShardIds,
+ final MultiStreamArgs multiStreamArgs) {
+ final Map shardIdToNewLeaseMap = new HashMap<>();
+ final Map shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);
+ final String streamIdentifier = Optional.ofNullable(multiStreamArgs.streamIdentifier())
+ .map(streamId -> streamId.serialize()).orElse("");
+ final Set shardIdsOfCurrentLeases = currentLeases.stream()
+ .peek(lease -> log.debug("{} : Existing lease: {}", streamIdentifier, lease))
+ .map(lease -> shardIdFromLeaseDeducer.apply(lease, multiStreamArgs))
+ .collect(Collectors.toSet());
+
+ final List openShards = getOpenShards(shards, streamIdentifier);
+ final MemoizationContext memoizationContext = new MemoizationContext();
+
+ // Iterate over the open shards and find those that don't have any lease entries.
+ for (Shard shard : openShards) {
+ final String shardId = shard.shardId();
+ log.debug("{} : Evaluating leases for open shard {} and its ancestors.", streamIdentifier, shardId);
+ if (shardIdsOfCurrentLeases.contains(shardId)) {
+ log.debug("{} : Lease for shardId {} already exists. Not creating a lease", streamIdentifier, shardId);
+ } else if (inconsistentShardIds.contains(shardId)) {
+ log.info("{} : shardId {} is an inconsistent child. Not creating a lease", streamIdentifier, shardId);
+ } else {
+ log.debug("{} : Beginning traversal of ancestry tree for shardId {}", streamIdentifier, shardId);
+
+ // A shard is a descendant if at least one if its ancestors exists in the lease table.
+ // We will create leases for only one level in the ancestry tree. Once we find the first ancestor
+ // that needs to be processed in order to complete the hash range, we will not create leases for
+ // further descendants of that ancestor.
+ final boolean isDescendant = checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPosition,
+ shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards, shardIdToNewLeaseMap,
+ memoizationContext, multiStreamArgs);
+
+ // If shard is a descendant, the leases for its ancestors were already created above. Open shards
+ // that are NOT descendants will not have leases yet, so we create them here. We will not create
+ // leases for open shards that ARE descendants yet - leases for these shards will be created upon
+ // SHARD_END of their parents.
+ if (!isDescendant) {
+ log.debug("{} : shardId {} has no ancestors. Creating a lease.", streamIdentifier, shardId);
+ final Lease newLease = multiStreamArgs.isMultiStreamMode() ?
+ newKCLMultiStreamLease(shard, multiStreamArgs.streamIdentifier()) :
+ newKCLLease(shard);
+ newLease.checkpoint(convertToCheckpoint(initialPosition));
+ log.debug("{} : Set checkpoint of {} to {}", streamIdentifier, newLease.leaseKey(), newLease.checkpoint());
+ shardIdToNewLeaseMap.put(shardId, newLease);
+ } else {
+ log.debug("{} : shardId {} is a descendant whose ancestors should already have leases. " +
+ "Not creating a lease.", streamIdentifier, shardId);
+ }
+ }
+ }
+
+ final List newLeasesToCreate = new ArrayList<>(shardIdToNewLeaseMap.values());
+ final Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
+ shardIdToShardMapOfAllKinesisShards, multiStreamArgs);
+ newLeasesToCreate.sort(startingSequenceNumberComparator);
+ return newLeasesToCreate;
+ }
+ }
+
+ /**
+ * Helper class to pass around state between recursive traversals of shard hierarchy.
+ */
+ @NoArgsConstructor
+ static class MemoizationContext {
+ private Map isDescendantMap = new HashMap<>();
+ private Map shouldCreateLeaseMap = new HashMap<>();
+
+ Boolean isDescendant(String shardId) {
+ return isDescendantMap.get(shardId);
+ }
+
+ void setIsDescendant(String shardId, Boolean isDescendant) {
+ isDescendantMap.put(shardId, isDescendant);
+ }
+
+ Boolean shouldCreateLease(String shardId) {
+ return shouldCreateLeaseMap.computeIfAbsent(shardId, x -> Boolean.FALSE);
+ }
+
+ void setShouldCreateLease(String shardId, Boolean shouldCreateLease) {
+ shouldCreateLeaseMap.put(shardId, shouldCreateLease);
+ }
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
index ba136f0a..96a0de6a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
@@ -19,34 +19,41 @@ import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.Synchronized;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardFilter;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
/**
@@ -59,8 +66,8 @@ public class KinesisShardDetector implements ShardDetector {
@NonNull
private final KinesisAsyncClient kinesisClient;
- @NonNull
- private final String streamName;
+ @NonNull @Getter
+ private final StreamIdentifier streamIdentifier;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private final long listShardsCacheAllowedAgeInSeconds;
@@ -77,16 +84,16 @@ public class KinesisShardDetector implements ShardDetector {
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) {
- this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
+ this(kinesisClient, StreamIdentifier.singleStreamInstance(streamName), listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
- public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
+ public KinesisShardDetector(KinesisAsyncClient kinesisClient, StreamIdentifier streamIdentifier, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
this.kinesisClient = kinesisClient;
- this.streamName = streamName;
+ this.streamIdentifier = streamIdentifier;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
@@ -149,12 +156,18 @@ public class KinesisShardDetector implements ShardDetector {
@Override
@Synchronized
public List listShards() {
+ return listShardsWithFilter(null);
+ }
+
+ @Override
+ @Synchronized
+ public List listShardsWithFilter(ShardFilter shardFilter) {
final List shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
- result = listShards(nextToken);
+ result = listShards(shardFilter, nextToken);
if (result == null) {
/*
@@ -172,15 +185,16 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
- private ListShardsResponse listShards(final String nextToken) {
+ private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
+ exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
- ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder();
+ ListShardsRequest.Builder request = KinesisRequestsBuilder.listShardsRequestBuilder().shardFilter(shardFilter);
if (StringUtils.isEmpty(nextToken)) {
- request = request.streamName(streamName);
+ request = request.streamName(streamIdentifier.streamName());
} else {
request = request.nextToken(nextToken);
}
@@ -189,10 +203,9 @@ public class KinesisShardDetector implements ShardDetector {
int remainingRetries = maxListShardsRetryAttempts;
while (result == null) {
-
try {
try {
- result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout);
+ result = getListShardsResponse(request.build());
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@@ -205,14 +218,20 @@ public class KinesisShardDetector implements ShardDetector {
+ " Active or Updating)");
return null;
} catch (LimitExceededException e) {
- log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamName,
+ log.info("Got LimitExceededException when listing shards {}. Backing off for {} millis.", streamIdentifier,
listShardsBackoffTimeInMillis);
try {
Thread.sleep(listShardsBackoffTimeInMillis);
} catch (InterruptedException ie) {
- log.debug("Stream {} : Sleep was interrupted ", streamName, ie);
+ log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie);
}
lastException = e;
+ } catch (ResourceNotFoundException e) {
+ log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
+ streamIdentifier.streamName());
+ return ListShardsResponse.builder().shards(Collections.emptyList())
+ .nextToken(null)
+ .build();
} catch (TimeoutException te) {
throw new RuntimeException(te);
}
@@ -243,4 +262,31 @@ public class KinesisShardDetector implements ShardDetector {
log.debug("{}. Age doesn't exceed limit of {} seconds.", message, listShardsCacheAllowedAgeInSeconds);
return false;
}
+
+ @Override
+ public ListShardsResponse getListShardsResponse(ListShardsRequest request) throws
+ ExecutionException, TimeoutException, InterruptedException {
+ return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout);
+ }
+
+ @Override
+ public List getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException {
+ final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
+ .streamName(streamIdentifier.streamName())
+ .shardIteratorType(ShardIteratorType.LATEST)
+ .shardId(shardId)
+ .build();
+
+ final GetShardIteratorResponse getShardIteratorResponse =
+ FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout);
+
+ final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
+ .shardIterator(getShardIteratorResponse.shardIterator())
+ .build();
+
+ final GetRecordsResponse getRecordsResponse =
+ FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), kinesisRequestTimeout);
+
+ return getRecordsResponse.childShards();
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java
index 802ee29b..359b7a44 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java
@@ -14,22 +14,21 @@
*/
package software.amazon.kinesis.leases;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Collections2;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Accessors;
+import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
/**
* This class contains data pertaining to a Lease. Distributed systems may use leases to partition work across a
* fleet of workers. Each unit of work (identified by a leaseKey) has a corresponding Lease. Every worker will contend
@@ -40,7 +39,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@NoArgsConstructor
@Getter
@Accessors(fluent = true)
-@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos"})
+@EqualsAndHashCode(exclude = {"concurrencyToken", "lastCounterIncrementNanos", "childShardIds", "pendingCheckpointState"})
@ToString
public class Lease {
/*
@@ -84,11 +83,21 @@ public class Lease {
* @return pending checkpoint, possibly null.
*/
private ExtendedSequenceNumber pendingCheckpoint;
+
+ /**
+ * Last pending application state. Deliberately excluded from hashCode and equals.
+ *
+ * @return pending checkpoint state, possibly null.
+ */
+ private byte[] pendingCheckpointState;
+
/**
* @return count of distinct lease holders between checkpoints.
*/
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set parentShardIds = new HashSet<>();
+ private Set childShardIds = new HashSet<>();
+ private HashKeyRangeForLease hashKeyRangeForLease;
/**
* Copy constructor, used by clone().
@@ -98,13 +107,24 @@ public class Lease {
protected Lease(Lease lease) {
this(lease.leaseKey(), lease.leaseOwner(), lease.leaseCounter(), lease.concurrencyToken(),
lease.lastCounterIncrementNanos(), lease.checkpoint(), lease.pendingCheckpoint(),
- lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds());
+ lease.ownerSwitchesSinceCheckpoint(), lease.parentShardIds(), lease.childShardIds(),
+ lease.pendingCheckpointState(), lease.hashKeyRangeForLease());
+ }
+
+ @Deprecated
+ public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
+ final UUID concurrencyToken, final Long lastCounterIncrementNanos,
+ final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
+ final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) {
+ this(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, checkpoint, pendingCheckpoint,
+ ownerSwitchesSinceCheckpoint, parentShardIds, new HashSet<>(), null, null);
}
public Lease(final String leaseKey, final String leaseOwner, final Long leaseCounter,
final UUID concurrencyToken, final Long lastCounterIncrementNanos,
final ExtendedSequenceNumber checkpoint, final ExtendedSequenceNumber pendingCheckpoint,
- final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds) {
+ final Long ownerSwitchesSinceCheckpoint, final Set parentShardIds, final Set childShardIds,
+ final byte[] pendingCheckpointState, final HashKeyRangeForLease hashKeyRangeForLease) {
this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter;
@@ -116,6 +136,11 @@ public class Lease {
if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds);
}
+ if (childShardIds != null) {
+ this.childShardIds.addAll(childShardIds);
+ }
+ this.hashKeyRangeForLease = hashKeyRangeForLease;
+ this.pendingCheckpointState = pendingCheckpointState;
}
/**
@@ -135,7 +160,9 @@ public class Lease {
ownerSwitchesSinceCheckpoint(lease.ownerSwitchesSinceCheckpoint());
checkpoint(lease.checkpoint);
pendingCheckpoint(lease.pendingCheckpoint);
+ pendingCheckpointState(lease.pendingCheckpointState);
parentShardIds(lease.parentShardIds);
+ childShardIds(lease.childShardIds);
}
/**
@@ -214,6 +241,15 @@ public class Lease {
this.pendingCheckpoint = pendingCheckpoint;
}
+ /**
+ * Sets pending checkpoint state.
+ *
+ * @param pendingCheckpointState can be null
+ */
+ public void pendingCheckpointState(byte[] pendingCheckpointState) {
+ this.pendingCheckpointState = pendingCheckpointState;
+ }
+
/**
* Sets ownerSwitchesSinceCheckpoint.
*
@@ -233,6 +269,27 @@ public class Lease {
this.parentShardIds.addAll(parentShardIds);
}
+ /**
+ * Sets childShardIds.
+ *
+ * @param childShardIds may not be null
+ */
+ public void childShardIds(@NonNull final Collection childShardIds) {
+ this.childShardIds.addAll(childShardIds);
+ }
+
+ /**
+ * Set the hash range key for this shard.
+ * @param hashKeyRangeForLease
+ */
+ public void hashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) {
+ if (this.hashKeyRangeForLease == null) {
+ this.hashKeyRangeForLease = hashKeyRangeForLease;
+ } else if (!this.hashKeyRangeForLease.equals(hashKeyRangeForLease)) {
+ throw new IllegalArgumentException("hashKeyRange is immutable");
+ }
+ }
+
/**
* Sets leaseOwner.
*
@@ -250,4 +307,6 @@ public class Lease {
public Lease copy() {
return new Lease(this);
}
+
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
new file mode 100644
index 00000000..6e3104ae
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.leases;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.utils.CollectionUtils;
+import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.leases.exceptions.DependencyException;
+import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
+import software.amazon.kinesis.leases.exceptions.InvalidStateException;
+import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.retrieval.AWSExceptionManager;
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by
+ * {@link LeaseManagementConfig#leaseCleanupConfig()} asynchronously.
+ */
+@Accessors(fluent=true)
+@Slf4j
+@RequiredArgsConstructor
+@EqualsAndHashCode
+public class LeaseCleanupManager {
+ @NonNull
+ private final LeaseCoordinator leaseCoordinator;
+ @NonNull
+ private final MetricsFactory metricsFactory;
+ @NonNull
+ private final ScheduledExecutorService deletionThreadPool;
+ private final boolean cleanupLeasesUponShardCompletion;
+ private final long leaseCleanupIntervalMillis;
+ private final long completedLeaseCleanupIntervalMillis;
+ private final long garbageLeaseCleanupIntervalMillis;
+ private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted();
+ private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted();
+
+ private final Queue deletionQueue = new ConcurrentLinkedQueue<>();
+
+ private static final long INITIAL_DELAY = 0L;
+
+ @Getter
+ private volatile boolean isRunning = false;
+
+ /**
+ * Starts the lease cleanup thread, which is scheduled periodically as specified by
+ * {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
+ */
+ public void start() {
+ if (!isRunning) {
+ log.info("Starting lease cleanup thread.");
+ completedLeaseStopwatch.reset().start();
+ garbageLeaseStopwatch.reset().start();
+ deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
+ TimeUnit.MILLISECONDS);
+ isRunning = true;
+ } else {
+ log.info("Lease cleanup thread already running, no need to start.");
+ }
+ }
+
+ /**
+ * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
+ * for checking the duplicate entries.
+ * @param leasePendingDeletion
+ */
+ public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
+ final Lease lease = leasePendingDeletion.lease();
+ if (lease == null) {
+ log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
+ lease.leaseKey());
+ } else {
+ log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
+ if (!deletionQueue.add(leasePendingDeletion)) {
+ log.warn("Unable to enqueue lease {} for deletion.", lease.leaseKey());
+ }
+ }
+ }
+
+ /**
+ * Check if lease was already enqueued for deletion.
+ * //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
+ * @param leasePendingDeletion
+ * @return true if enqueued for deletion; false otherwise.
+ */
+ public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) {
+ return deletionQueue.contains(leasePendingDeletion);
+ }
+
+ /**
+ * Returns how many leases are currently waiting in the queue pending deletion.
+ * @return number of leases pending deletion.
+ */
+ private int leasesPendingDeletion() {
+ return deletionQueue.size();
+ }
+
+ private boolean timeToCheckForCompletedShard() {
+ return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
+ }
+
+ private boolean timeToCheckForGarbageShard() {
+ return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
+ }
+
+ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
+ boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) throws TimeoutException,
+ InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
+ final Lease lease = leasePendingDeletion.lease();
+ final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
+ final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
+
+ final AWSExceptionManager exceptionManager = createExceptionManager();
+
+ boolean cleanedUpCompletedLease = false;
+ boolean cleanedUpGarbageLease = false;
+ boolean alreadyCheckedForGarbageCollection = false;
+ boolean wereChildShardsPresent = false;
+ boolean wasResourceNotFound = false;
+
+ try {
+ if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
+ final Lease leaseFromDDB = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey());
+ if(leaseFromDDB != null) {
+ Set childShardKeys = leaseFromDDB.childShardIds();
+ if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
+ try {
+ childShardKeys = leasePendingDeletion.getChildShardsFromService();
+
+ if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
+ log.error(
+ "No child shards returned from service for shard {} for {} while cleaning up lease.",
+ shardInfo.shardId(), streamIdentifier.streamName());
+ } else {
+ wereChildShardsPresent = true;
+ updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
+ }
+ } catch (ExecutionException e) {
+ throw exceptionManager.apply(e.getCause());
+ } finally {
+ alreadyCheckedForGarbageCollection = true;
+ }
+ } else {
+ wereChildShardsPresent = true;
+ }
+ try {
+ cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
+ } catch (Exception e) {
+ // Suppressing the exception here, so that we can attempt for garbage cleanup.
+ log.warn("Unable to cleanup lease for shard {} in {}", shardInfo.shardId(), streamIdentifier.streamName(), e);
+ }
+ } else {
+ log.info("Lease not present in lease table while cleaning the shard {} of {}", shardInfo.shardId(), streamIdentifier.streamName());
+ cleanedUpCompletedLease = true;
+ }
+ }
+
+ if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
+ try {
+ wereChildShardsPresent = !CollectionUtils
+ .isNullOrEmpty(leasePendingDeletion.getChildShardsFromService());
+ } catch (ExecutionException e) {
+ throw exceptionManager.apply(e.getCause());
+ }
+ }
+ } catch (ResourceNotFoundException e) {
+ wasResourceNotFound = true;
+ cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease, e);
+ }
+
+ return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
+ wasResourceNotFound);
+ }
+
+ // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
+ // stream (known explicitly from ResourceNotFound being thrown when processing this shard),
+ private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ log.warn("Deleting lease {} as it is not present in the stream.", lease, e);
+ leaseCoordinator.leaseRefresher().deleteLease(lease);
+ return true;
+ }
+
+ private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ for (String parentShard : lease.parentShardIds()) {
+ final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
+
+ if (parentLease != null) {
+ log.warn("Lease {} has a parent lease {} which is still present in the lease table, skipping deletion " +
+ "for this lease.", lease, parentLease);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // We should only be deleting the current shard's lease if
+ // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
+ // 2. Its parent shard lease(s) have already been deleted.
+ private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set childShardKeys)
+ throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
+ final Set processedChildShardLeaseKeys = new HashSet<>();
+ final Set childShardLeaseKeys = childShardKeys.stream().map(ck -> ShardInfo.getLeaseKey(shardInfo, ck))
+ .collect(Collectors.toSet());
+
+ for (String childShardLeaseKey : childShardLeaseKeys) {
+ final Lease childShardLease = Optional.ofNullable(
+ leaseCoordinator.leaseRefresher().getLease(childShardLeaseKey))
+ .orElseThrow(() -> new IllegalStateException(
+ "Child lease " + childShardLeaseKey + " for completed shard not found in "
+ + "lease table - not cleaning up lease " + lease));
+
+ if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease
+ .checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
+ processedChildShardLeaseKeys.add(childShardLease.leaseKey());
+ }
+ }
+
+ if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
+ return false;
+ }
+
+ log.info("Deleting lease {} as it has been completely processed and processing of child shard(s) has begun.",
+ lease);
+ leaseCoordinator.leaseRefresher().deleteLease(lease);
+
+ return true;
+ }
+
+ private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys)
+ throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ final Lease updatedLease = leasePendingDeletion.lease();
+ updatedLease.childShardIds(childShardKeys);
+
+ leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
+ }
+
+ private AWSExceptionManager createExceptionManager() {
+ final AWSExceptionManager exceptionManager = new AWSExceptionManager();
+ exceptionManager.add(ResourceNotFoundException.class, t -> t);
+
+ return exceptionManager;
+ }
+
+ @VisibleForTesting
+ void cleanupLeases() {
+ log.info("Number of pending leases to clean before the scan : {}", leasesPendingDeletion());
+ if (deletionQueue.isEmpty()) {
+ log.debug("No leases pending deletion.");
+ } else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
+ final Queue failedDeletions = new ConcurrentLinkedQueue<>();
+ boolean completedLeaseCleanedUp = false;
+ boolean garbageLeaseCleanedUp = false;
+
+ log.debug("Attempting to clean up {} lease(s).", deletionQueue.size());
+
+ while (!deletionQueue.isEmpty()) {
+ final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
+ final String leaseKey = leasePendingDeletion.lease().leaseKey();
+ final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
+ boolean deletionSucceeded = false;
+ try {
+ final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
+ timeToCheckForCompletedShard(), timeToCheckForGarbageShard());
+ completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
+ garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
+
+ if (leaseCleanupResult.leaseCleanedUp()) {
+ log.info("Successfully cleaned up lease {} for {} due to {}", leaseKey, streamIdentifier, leaseCleanupResult);
+ deletionSucceeded = true;
+ } else {
+ log.warn("Unable to clean up lease {} for {} due to {}", leaseKey, streamIdentifier, leaseCleanupResult);
+ }
+ } catch (Exception e) {
+ log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
+ "scheduled execution.", leaseKey, streamIdentifier, e);
+ }
+ if (!deletionSucceeded) {
+ log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
+ failedDeletions.add(leasePendingDeletion);
+ }
+ }
+ if (completedLeaseCleanedUp) {
+ log.debug("At least one completed lease was cleaned up - restarting interval");
+ completedLeaseStopwatch.reset().start();
+ }
+ if (garbageLeaseCleanedUp) {
+ log.debug("At least one garbage lease was cleaned up - restarting interval");
+ garbageLeaseStopwatch.reset().start();
+ }
+ deletionQueue.addAll(failedDeletions);
+
+ log.info("Number of pending leases to clean after the scan : {}", leasesPendingDeletion());
+ }
+ }
+
+ private class LeaseCleanupThread implements Runnable {
+ @Override
+ public void run() {
+ cleanupLeases();
+ }
+ }
+
+ @Value
+ public static class LeaseCleanupResult {
+ boolean cleanedUpCompletedLease;
+ boolean cleanedUpGarbageLease;
+ boolean wereChildShardsPresent;
+ boolean wasResourceNotFound;
+
+ public boolean leaseCleanedUp() {
+ return cleanedUpCompletedLease | cleanedUpGarbageLease;
+ }
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
index 880fab4c..6437f339 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
@@ -92,6 +92,9 @@ public interface LeaseCoordinator {
*
* @param lease lease object containing updated values
* @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease
+ * @param operation that performs updateLease
+ * @param singleStreamShardId for metrics emission in single stream mode. MultiStream mode will get the
+ * shardId from the lease object
*
* @return true if update succeeded, false otherwise
*
@@ -99,7 +102,7 @@ public interface LeaseCoordinator {
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
- boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId)
+ boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 20e0aa8f..473db5bb 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -15,23 +15,26 @@
package software.amazon.kinesis.leases;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
+import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
+import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.LeaseCleanupConfig;
+import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -46,6 +49,19 @@ public class LeaseManagementConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1);
+ public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
+ public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
+ public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
+ public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
+ public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;
+
+
+ public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
+ .leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
+ .completedLeaseCleanupIntervalMillis(DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS)
+ .garbageLeaseCleanupIntervalMillis(DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS)
+ .build();
+
/**
* Name of the table to use in DynamoDB
*
@@ -71,7 +87,7 @@ public class LeaseManagementConfig {
* Name of the Kinesis Data Stream to read records from.
*/
@NonNull
- private final String streamName;
+ private String streamName;
/**
* Used to distinguish different workers/processes of a KCL application.
*
@@ -106,6 +122,15 @@ public class LeaseManagementConfig {
*/
private boolean cleanupLeasesUponShardCompletion = true;
+ /**
+ * Configuration for lease cleanup in {@link LeaseCleanupManager}.
+ *
+ * Default lease cleanup interval value: 1 minute.
+ * Default completed lease cleanup threshold: 5 minute.
+ * Default garbage lease cleanup threshold: 30 minute.
+ */
+ private final LeaseCleanupConfig leaseCleanupConfig = DEFAULT_LEASE_CLEANUP_CONFIG;
+
/**
* The max number of leases (shards) this worker should process.
* This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
@@ -116,7 +141,7 @@ public class LeaseManagementConfig {
*
* Default value: {@link Integer#MAX_VALUE}
*/
- private int maxLeasesForWorker = Integer.MAX_VALUE;;
+ private int maxLeasesForWorker = Integer.MAX_VALUE;
/**
* Max leases to steal from another worker at one time (for load balancing).
@@ -141,6 +166,11 @@ public class LeaseManagementConfig {
*/
private int initialLeaseTableWriteCapacity = 10;
+ /**
+ * Configurable functional interface to override the existing shardDetector.
+ */
+ private Function customShardDetectorProvider;
+
/**
* The size of the thread pool to create for the lease renewer to use.
*
@@ -168,6 +198,20 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PROVISIONED;
+ /**
+ * Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
+ * If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
+ * {@link #leasesRecoveryAuditorInconsistencyConfidenceThreshold}
+ */
+ private long leasesRecoveryAuditorExecutionFrequencyMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
+
+ /**
+ * Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table
+ * is inconsistent. If the auditor finds same set of inconsistencies consecutively for a stream for this many times,
+ * then it would trigger a shard sync.
+ */
+ private int leasesRecoveryAuditorInconsistencyConfidenceThreshold = DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;
+
/**
* The initial position for getting records from Kinesis streams.
*
@@ -182,6 +226,24 @@ public class LeaseManagementConfig {
private MetricsFactory metricsFactory = new NullMetricsFactory();
+ @Deprecated
+ public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient,
+ String streamName, String workerIdentifier) {
+ this.tableName = tableName;
+ this.dynamoDBClient = dynamoDBClient;
+ this.kinesisClient = kinesisClient;
+ this.streamName = streamName;
+ this.workerIdentifier = workerIdentifier;
+ }
+
+ public LeaseManagementConfig(String tableName, DynamoDbAsyncClient dynamoDBClient, KinesisAsyncClient kinesisClient,
+ String workerIdentifier) {
+ this.tableName = tableName;
+ this.dynamoDBClient = dynamoDBClient;
+ this.kinesisClient = kinesisClient;
+ this.workerIdentifier = workerIdentifier;
+ }
+
/**
* Returns the metrics factory.
*
@@ -240,12 +302,21 @@ public class LeaseManagementConfig {
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
- private HierarchicalShardSyncer hierarchicalShardSyncer = new HierarchicalShardSyncer();
+ private HierarchicalShardSyncer hierarchicalShardSyncer;
private LeaseManagementFactory leaseManagementFactory;
+ private HierarchicalShardSyncer hierarchicalShardSyncer() {
+ if(hierarchicalShardSyncer == null) {
+ hierarchicalShardSyncer = new HierarchicalShardSyncer();
+ }
+ return hierarchicalShardSyncer;
+ }
+
+ @Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
+ Validate.notEmpty(streamName(), "Stream name is empty");
leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
streamName(),
dynamoDBClient(),
@@ -275,4 +346,54 @@ public class LeaseManagementConfig {
return leaseManagementFactory;
}
+ /**
+ * Vends LeaseManagementFactory that performs serde based on leaseSerializer and shard sync based on isMultiStreamingMode
+ * @param leaseSerializer
+ * @param isMultiStreamingMode
+ * @return LeaseManagementFactory
+ */
+ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer leaseSerializer, boolean isMultiStreamingMode) {
+ if(leaseManagementFactory == null) {
+ leaseManagementFactory = new DynamoDBLeaseManagementFactory(kinesisClient(),
+ dynamoDBClient(),
+ tableName(),
+ workerIdentifier(),
+ executorService(),
+ failoverTimeMillis(),
+ epsilonMillis(),
+ maxLeasesForWorker(),
+ maxLeasesToStealAtOneTime(),
+ maxLeaseRenewalThreads(),
+ cleanupLeasesUponShardCompletion(),
+ ignoreUnexpectedChildShards(),
+ shardSyncIntervalMillis(),
+ consistentReads(),
+ listShardsBackoffTimeInMillis(),
+ maxListShardsRetryAttempts(),
+ maxCacheMissesBeforeReload(),
+ listShardsCacheAllowedAgeInSeconds(),
+ cacheMissWarningModulus(),
+ initialLeaseTableReadCapacity(),
+ initialLeaseTableWriteCapacity(),
+ hierarchicalShardSyncer(),
+ tableCreatorCallback(),
+ dynamoDbRequestTimeout(),
+ billingMode(),
+ leaseSerializer,
+ customShardDetectorProvider(),
+ isMultiStreamingMode,
+ leaseCleanupConfig());
+ }
+ return leaseManagementFactory;
+ }
+
+ /**
+ * Set leaseManagementFactory and return the current LeaseManagementConfig instance.
+ * @param leaseManagementFactory
+ * @return LeaseManagementConfig
+ */
+ public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory leaseManagementFactory) {
+ this.leaseManagementFactory = leaseManagementFactory;
+ return this;
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
index 72f48fea..ecf9b390 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
+import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -26,7 +27,18 @@ public interface LeaseManagementFactory {
ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory);
+ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
+ throw new UnsupportedOperationException();
+ }
+
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();
+
+ default ShardDetector createShardDetector(StreamConfig streamConfig) {
+ throw new UnsupportedOperationException();
+ }
+
+ LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory);
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
index 3ba22c2b..b7f38a4e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
@@ -16,6 +16,7 @@ package software.amazon.kinesis.leases;
import java.util.List;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@@ -60,6 +61,18 @@ public interface LeaseRefresher {
*/
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
+ /**
+ * List all leases for a given stream synchronously.
+ *
+ * @throws DependencyException if DynamoDB scan fails in an unexpected way
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
+ *
+ * @return list of leases
+ */
+ List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
+ ProvisionedThroughputException;
+
/**
* List all objects in table synchronously.
*
@@ -86,15 +99,15 @@ public interface LeaseRefresher {
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
- * @param shardId Get the lease for this shardId
+ * @param leaseKey Get the lease for this leasekey
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB get fails due to lack of capacity
* @throws DependencyException if DynamoDB get fails in an unexpected way
*
- * @return lease for the specified shardId, or null if one doesn't exist
+ * @return lease for the specified leaseKey, or null if one doesn't exist
*/
- Lease getLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ Lease getLease(String leaseKey) throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter
@@ -178,6 +191,21 @@ public interface LeaseRefresher {
boolean updateLease(Lease lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * Update application-specific fields of the given lease in DynamoDB. Does not update fields managed by the leasing
+ * library such as leaseCounter, leaseOwner, or leaseKey.
+ *
+ * @return true if update succeeded, false otherwise
+ *
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
+ * @throws DependencyException if DynamoDB update fails in an unexpected way
+ */
+ default void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throw new UnsupportedOperationException("updateLeaseWithNoExpectation is not implemented");
+ }
+
/**
* Check (synchronously) if there are any leases in the lease table.
*
@@ -193,13 +221,13 @@ public interface LeaseRefresher {
* Gets the current checkpoint of the shard. This is useful in the resharding use case
* where we will wait for the parent shard to complete before starting on the records from a child shard.
*
- * @param shardId Checkpoint of this shard will be returned
+ * @param leaseKey Checkpoint of this shard will be returned
* @return Checkpoint of this shard, or null if the shard record doesn't exist.
*
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws InvalidStateException if lease table does not exist
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
- ExtendedSequenceNumber getCheckpoint(String shardId)
+ ExtendedSequenceNumber getCheckpoint(String leaseKey)
throws ProvisionedThroughputException, InvalidStateException, DependencyException;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java
index 9ed5616f..25ec5b45 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRenewer.java
@@ -86,6 +86,9 @@ public interface LeaseRenewer {
*
* @param lease lease object containing updated data
* @param concurrencyToken obtained by calling Lease.concurrencyToken for a currently held lease
+ * @param operation that performs updateLease
+ * @param singleStreamShardId shardId for metrics emission in single stream mode. MultiStream mode will get the
+ * shardId from the lease object
*
* @return true if update succeeds, false otherwise
*
@@ -93,7 +96,7 @@ public interface LeaseRenewer {
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
*/
- boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String shardId)
+ boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
index b8aa0339..f36f5a66 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
@@ -16,14 +16,11 @@ package software.amazon.kinesis.leases;
import java.util.Collection;
import java.util.Map;
-
-
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
-import software.amazon.kinesis.leases.Lease;
/**
* Utility class that manages the mapping of Lease objects/operations to records in DynamoDB.
@@ -46,6 +43,11 @@ public interface LeaseSerializer {
*/
Lease fromDynamoRecord(Map dynamoRecord);
+
+ default Lease fromDynamoRecord(Map dynamoRecord, Lease leaseToUpdate) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* @param lease
* @return the attribute value map representing a Lease's hash key given a Lease object.
@@ -77,6 +79,14 @@ public interface LeaseSerializer {
*/
Map getDynamoNonexistantExpectation();
+ /**
+ * @param leaseKey
+ * @return the attribute value map asserting that a lease does exist.
+ */
+ default Map getDynamoExistentExpectation(String leaseKey) {
+ throw new UnsupportedOperationException("DynamoExistantExpectation is not implemented");
+ }
+
/**
* @param lease
* @return the attribute value map that increments a lease counter
@@ -103,6 +113,15 @@ public interface LeaseSerializer {
*/
Map getDynamoUpdateLeaseUpdate(Lease lease);
+ /**
+ * @param lease
+ * @param updateField
+ * @return the attribute value map that updates application-specific data for a lease
+ */
+ default Map getDynamoUpdateLeaseUpdate(Lease lease, UpdateField updateField) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* @return the key schema for creating a DynamoDB table to store leases
*/
@@ -112,4 +131,5 @@ public interface LeaseSerializer {
* @return attribute definitions for creating a DynamoDB table to store leases
*/
Collection getAttributeDefinitions();
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java
new file mode 100644
index 00000000..c8811354
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/MultiStreamLease.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.leases;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.Validate;
+
+import java.util.Objects;
+
+import static com.google.common.base.Verify.verifyNotNull;
+
+@Setter
+@NoArgsConstructor
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode(callSuper = true)
+public class MultiStreamLease extends Lease {
+
+ @NonNull private String streamIdentifier;
+ @NonNull private String shardId;
+
+ public MultiStreamLease(MultiStreamLease other) {
+ super(other);
+ streamIdentifier(other.streamIdentifier);
+ shardId(other.shardId);
+ }
+
+ @Override
+ public void update(Lease other) {
+ MultiStreamLease casted = validateAndCast(other);
+ super.update(casted);
+ streamIdentifier(casted.streamIdentifier);
+ shardId(casted.shardId);
+ }
+
+ public static String getLeaseKey(String streamIdentifier, String shardId) {
+ verifyNotNull(streamIdentifier, "streamIdentifier should not be null");
+ verifyNotNull(shardId, "shardId should not be null");
+ return streamIdentifier + ":" + shardId;
+ }
+
+ /**
+ * Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
+ *
+ * @return A deep copy of this object.
+ */
+ @Override
+ public MultiStreamLease copy() {
+ return new MultiStreamLease(this);
+ }
+
+ /**
+ * Validate and cast the lease to MultiStream lease
+ * @param lease
+ * @return MultiStreamLease
+ */
+ public static MultiStreamLease validateAndCast(Lease lease) {
+ Validate.isInstanceOf(MultiStreamLease.class, lease);
+ return (MultiStreamLease) lease;
+ }
+
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
index cf3a1a78..62b93855 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
@@ -15,16 +15,75 @@
package software.amazon.kinesis.leases;
-import software.amazon.awssdk.services.kinesis.model.Shard;
-
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardFilter;
+import software.amazon.kinesis.common.StreamIdentifier;
/**
*
*/
public interface ShardDetector {
+
+ /**
+ * Gets shard based on shardId.
+ *
+ * @param shardId
+ * @return Shard
+ */
Shard shard(String shardId);
+ /**
+ * List shards.
+ *
+ * @return Shards
+ */
List listShards();
+ /**
+ * List shards with shard filter.
+ *
+ * @param ShardFilter
+ * @return Shards
+ */
+ default List listShardsWithFilter(ShardFilter shardFilter) {
+ throw new UnsupportedOperationException("listShardsWithFilter not available.");
+ }
+
+ /**
+ * Gets stream identifier.
+ *
+ * @return StreamIdentifier
+ */
+ default StreamIdentifier streamIdentifier() {
+ throw new UnsupportedOperationException("StreamName not available");
+ }
+
+ /**
+ * Gets a list shards response based on the request.
+ *
+ * @param request list shards request
+ * @return ListShardsResponse which contains list shards response
+ */
+ default ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception {
+ throw new UnsupportedOperationException("getListShardsResponse not available.");
+ }
+
+ /**
+ * Gets the children shards of a shard.
+ * @param shardId
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ default List getChildShards(String shardId) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("getChildShards not available.");
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
index 89b8f94a..aff3f6f0 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
@@ -18,6 +18,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -36,6 +37,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ToString
public class ShardInfo {
+ private final Optional streamIdentifierSerOpt;
private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
@@ -54,11 +56,27 @@ public class ShardInfo {
* @param checkpoint
* the latest checkpoint from lease
*/
- // TODO: check what values can be null
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection parentShardIds,
final ExtendedSequenceNumber checkpoint) {
+ this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
+ }
+
+ /**
+ * Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier.
+ * The checkpoint is not part of the equality, but is used for debugging output.
+ * @param shardId
+ * @param concurrencyToken
+ * @param parentShardIds
+ * @param checkpoint
+ * @param streamIdentifierSer
+ */
+ public ShardInfo(@NonNull final String shardId,
+ final String concurrencyToken,
+ final Collection parentShardIds,
+ final ExtendedSequenceNumber checkpoint,
+ final String streamIdentifierSer) {
this.shardId = shardId;
this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>();
@@ -69,6 +87,7 @@ public class ShardInfo {
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint;
+ this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer);
}
/**
@@ -94,7 +113,8 @@ public class ShardInfo {
*/
@Override
public int hashCode() {
- return new HashCodeBuilder().append(concurrencyToken).append(parentShardIds).append(shardId).toHashCode();
+ return new HashCodeBuilder()
+ .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode();
}
/**
@@ -118,8 +138,30 @@ public class ShardInfo {
}
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
- .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId).isEquals();
+ .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
+ .append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
}
+ /**
+ * Utility method to derive lease key from ShardInfo.
+ * @param shardInfo
+ * @return lease key
+ */
+ public static String getLeaseKey(ShardInfo shardInfo) {
+ return getLeaseKey(shardInfo, shardInfo.shardId());
+ }
+
+ /**
+ * Utility method to derive lease key from ShardInfo and shardId to override.
+ * @param shardInfo
+ * @param shardIdOverride
+ * @return lease key
+ */
+ public static String getLeaseKey(ShardInfo shardInfo, String shardIdOverride) {
+ return shardInfo.streamIdentifierSerOpt().isPresent() ?
+ MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardIdOverride) :
+ shardIdOverride;
+ }
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
index c59608b2..dd576114 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
@@ -23,6 +23,7 @@ import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
@@ -45,6 +46,7 @@ public class ShardSyncTask implements ConsumerTask {
@NonNull
private final InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
+ private final boolean garbageCollectLeases;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
@NonNull
@@ -62,17 +64,25 @@ public class ShardSyncTask implements ConsumerTask {
public TaskResult call() {
Exception exception = null;
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
+ boolean shardSyncSuccess = true;
try {
- hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
- cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
- if (shardSyncTaskIdleTimeMillis > 0) {
+ boolean didPerformShardSync = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
+ initialPosition, scope, ignoreUnexpectedChildShards,
+ leaseRefresher.isLeaseTableEmpty());
+
+ if (didPerformShardSync && shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
} catch (Exception e) {
log.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
+ shardSyncSuccess = false;
} finally {
+ // NOTE: This metric is reflecting if a shard sync task succeeds. Customer can use this metric to monitor if
+ // their application encounter any shard sync failures. This metric can help to detect potential shard stuck issues
+ // that are due to shard sync failures.
+ MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED);
MetricsUtil.endScope(scope);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
index f6db72e3..e03046a0 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
@@ -14,9 +14,11 @@
*/
package software.amazon.kinesis.leases;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@@ -24,6 +26,7 @@ import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.coordinator.ExecutorStateEvent;
import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -45,6 +48,7 @@ public class ShardSyncTaskManager {
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesUponShardCompletion;
+ private final boolean garbageCollectLeases;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
@NonNull
@@ -53,6 +57,10 @@ public class ShardSyncTaskManager {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
+ private ConsumerTask currentTask;
+ private CompletableFuture future;
+ private AtomicBoolean shardSyncRequestPending;
+ private final ReentrantLock lock;
/**
* Constructor.
@@ -77,11 +85,14 @@ public class ShardSyncTaskManager {
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+ this.garbageCollectLeases = true;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
+ this.shardSyncRequestPending = new AtomicBoolean(false);
+ this.lock = new ReentrantLock();
}
/**
@@ -105,21 +116,48 @@ public class ShardSyncTaskManager {
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
+ this.garbageCollectLeases = true;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory;
+ this.shardSyncRequestPending = new AtomicBoolean(false);
+ this.lock = new ReentrantLock();
}
- private ConsumerTask currentTask;
- private Future future;
-
- public synchronized boolean syncShardAndLeaseInfo() {
- return checkAndSubmitNextTask();
+ /**
+ * Call a ShardSyncTask and return the Task Result.
+ * @return the Task Result.
+ */
+ public TaskResult callShardSyncTask() {
+ final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector,
+ leaseRefresher,
+ initialPositionInStream,
+ cleanupLeasesUponShardCompletion,
+ garbageCollectLeases,
+ ignoreUnexpectedChildShards,
+ shardSyncIdleTimeMillis,
+ hierarchicalShardSyncer,
+ metricsFactory);
+ final ConsumerTask metricCollectingTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
+ return metricCollectingTask.call();
}
- private synchronized boolean checkAndSubmitNextTask() {
+ /**
+ * Submit a ShardSyncTask and return if the submission is successful.
+ * @return if the casting is successful.
+ */
+ public boolean submitShardSyncTask() {
+ try {
+ lock.lock();
+ return checkAndSubmitNextTask();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
@@ -140,23 +178,54 @@ public class ShardSyncTaskManager {
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
+ garbageCollectLeases,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory),
metricsFactory);
- future = executorService.submit(currentTask);
+ future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
+ .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
+
+ log.info(new ExecutorStateEvent(executorService).message());
+
submittedNewTask = true;
if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType());
}
} else {
if (log.isDebugEnabled()) {
- log.debug("Previous {} task still pending. Not submitting new task.", currentTask.taskType());
+ log.debug("Previous {} task still pending. Not submitting new task. "
+ + "Enqueued a request that will be executed when the current request completes.", currentTask.taskType());
}
+ shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
-
return submittedNewTask;
}
+ private void handlePendingShardSyncs(Throwable exception, TaskResult taskResult) {
+ if (exception != null || taskResult.getException() != null) {
+ log.error("Caught exception running {} task: ", currentTask.taskType(), exception != null ? exception : taskResult.getException());
+ }
+ // Acquire lock here. If shardSyncRequestPending is false in this completionStage and
+ // submitShardSyncTask is invoked, before completion stage exits (future completes)
+ // but right after the value of shardSyncRequestPending is checked, it will result in
+ // shardSyncRequestPending being set to true, but no pending futures to trigger the next
+ // ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
+ // previous task is in this completion stage, checkAndSubmitNextTask is not invoked
+ // until this completionStage exits.
+ try {
+ lock.lock();
+ if (shardSyncRequestPending.get()) {
+ shardSyncRequestPending.set(false);
+ // reset future to null, so next call creates a new one
+ // without trying to get results from the old future.
+ future = null;
+ checkAndSubmitNextTask();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java
new file mode 100644
index 00000000..9461a18e
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/UpdateField.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package software.amazon.kinesis.leases;
+
+/**
+ * These are the special fields that will be updated only once during the lifetime of the lease.
+ * Since these are meta information that will not affect lease ownership or data durability, we allow
+ * any elected leader or worker to set these fields directly without any conditional checks.
+ * Note that though HASH_KEY_RANGE will be available during lease initialization in newer versions, we keep this
+ * for backfilling while rolling forward to newer versions.
+ */
+public enum UpdateField {
+ CHILD_SHARDS, HASH_KEY_RANGE
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index fe31d996..78673f66 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -38,6 +38,7 @@ import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
+import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@@ -348,8 +349,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
@Override
public boolean updateLease(final Lease lease, final UUID concurrencyToken, final String operation,
- final String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
- return leaseRenewer.updateLease(lease, concurrencyToken, operation, shardId);
+ final String singleStreamShardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ return leaseRenewer.updateLease(lease, concurrencyToken, operation, singleStreamShardId);
}
/**
@@ -377,9 +378,19 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
}
+ /**
+ * Utility method to convert the basic lease or multistream lease to ShardInfo
+ * @param lease
+ * @return ShardInfo
+ */
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
- return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
- lease.checkpoint());
+ if (lease instanceof MultiStreamLease) {
+ return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
+ lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier());
+ } else {
+ return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
+ lease.checkpoint());
+ }
}
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index c2ade429..5102bc5e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -17,7 +17,8 @@ package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
-
+import java.util.concurrent.Executors;
+import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
@@ -25,11 +26,16 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.LeaseCleanupConfig;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
+import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
+import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -44,8 +50,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
- private final String streamName;
- @NonNull
private final DynamoDbAsyncClient dynamoDBClient;
@NonNull
private final String tableName;
@@ -54,9 +58,13 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final ExecutorService executorService;
@NonNull
- private final InitialPositionInStreamExtended initialPositionInStream;
+ private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
@NonNull
- private final HierarchicalShardSyncer hierarchicalShardSyncer;
+ private final LeaseSerializer leaseSerializer;
+ @NonNull
+ private StreamConfig streamConfig;
+
+ private Function customShardDetectorProvider;
private final long failoverTimeMillis;
private final long epsilonMillis;
@@ -77,6 +85,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
+ private final boolean isMultiStreamMode;
+ private final LeaseCleanupConfig leaseCleanupConfig;
/**
* Constructor.
@@ -228,7 +238,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
/**
* Constructor.
- *
+ *
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
@@ -309,6 +319,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param dynamoDbRequestTimeout
* @param billingMode
*/
+ @Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
@@ -321,13 +332,118 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
+
+ this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
+ workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
+ maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
+ maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
+ cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
+ hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param kinesisClient
+ * @param streamConfig
+ * @param dynamoDBClient
+ * @param tableName
+ * @param workerIdentifier
+ * @param executorService
+ * @param failoverTimeMillis
+ * @param epsilonMillis
+ * @param maxLeasesForWorker
+ * @param maxLeasesToStealAtOneTime
+ * @param maxLeaseRenewalThreads
+ * @param cleanupLeasesUponShardCompletion
+ * @param ignoreUnexpectedChildShards
+ * @param shardSyncIntervalMillis
+ * @param consistentReads
+ * @param listShardsBackoffTimeMillis
+ * @param maxListShardsRetryAttempts
+ * @param maxCacheMissesBeforeReload
+ * @param listShardsCacheAllowedAgeInSeconds
+ * @param cacheMissWarningModulus
+ * @param initialLeaseTableReadCapacity
+ * @param initialLeaseTableWriteCapacity
+ * @param deprecatedHierarchicalShardSyncer
+ * @param tableCreatorCallback
+ * @param dynamoDbRequestTimeout
+ * @param billingMode
+ */
+ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
+ final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
+ final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
+ final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
+ final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
+ final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
+ final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
+ final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
+ final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
+ final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
+ Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
+ this(kinesisClient, dynamoDBClient, tableName,
+ workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
+ maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
+ maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
+ cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
+ deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
+ null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
+ this.streamConfig = streamConfig;
+ }
+
+ /**
+ * Constructor.
+ * @param kinesisClient
+ * @param dynamoDBClient
+ * @param tableName
+ * @param workerIdentifier
+ * @param executorService
+ * @param failoverTimeMillis
+ * @param epsilonMillis
+ * @param maxLeasesForWorker
+ * @param maxLeasesToStealAtOneTime
+ * @param maxLeaseRenewalThreads
+ * @param cleanupLeasesUponShardCompletion
+ * @param ignoreUnexpectedChildShards
+ * @param shardSyncIntervalMillis
+ * @param consistentReads
+ * @param listShardsBackoffTimeMillis
+ * @param maxListShardsRetryAttempts
+ * @param maxCacheMissesBeforeReload
+ * @param listShardsCacheAllowedAgeInSeconds
+ * @param cacheMissWarningModulus
+ * @param initialLeaseTableReadCapacity
+ * @param initialLeaseTableWriteCapacity
+ * @param deprecatedHierarchicalShardSyncer
+ * @param tableCreatorCallback
+ * @param dynamoDbRequestTimeout
+ * @param billingMode
+ * @param leaseSerializer
+ * @param customShardDetectorProvider
+ * @param isMultiStreamMode
+ * @param leaseCleanupConfig
+ */
+ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
+ final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
+ final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
+ final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
+ final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
+ final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
+ final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
+ final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
+ final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
+ final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
+ Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
+ Function customShardDetectorProvider, boolean isMultiStreamMode,
+ LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
- this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
- this.initialPositionInStream = initialPositionInStream;
this.failoverTimeMillis = failoverTimeMillis;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
@@ -344,10 +460,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
- this.hierarchicalShardSyncer = hierarchicalShardSyncer;
+ this.deprecatedHierarchicalShardSyncer = deprecatedHierarchicalShardSyncer;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
+ this.leaseSerializer = leaseSerializer;
+ this.customShardDetectorProvider = customShardDetectorProvider;
+ this.isMultiStreamMode = isMultiStreamMode;
+ this.leaseCleanupConfig = leaseCleanupConfig;
}
@Override
@@ -364,29 +484,77 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory);
}
- @Override
+ @Override @Deprecated
public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) {
return new ShardSyncTaskManager(this.createShardDetector(),
this.createLeaseRefresher(),
- initialPositionInStream,
+ streamConfig.initialPositionInStreamExtended(),
+ cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
+ shardSyncIntervalMillis,
+ executorService, deprecatedHierarchicalShardSyncer,
+ metricsFactory);
+ }
+
+ /**
+ * Create ShardSyncTaskManager from the streamConfig passed
+ * @param metricsFactory
+ * @param streamConfig
+ * @return ShardSyncTaskManager
+ */
+ @Override
+ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
+ return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
+ this.createLeaseRefresher(),
+ streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
- hierarchicalShardSyncer,
+ new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
metricsFactory);
}
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
- return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, new DynamoDBLeaseSerializer(), consistentReads,
+ return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
tableCreatorCallback, dynamoDbRequestTimeout, billingMode);
}
@Override
+ @Deprecated
public ShardDetector createShardDetector() {
- return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
- cacheMissWarningModulus, dynamoDbRequestTimeout);
+ return new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(),
+ listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds,
+ maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout);
+ }
+
+ /**
+ * KinesisShardDetector supports reading from service only using streamName. Support for accountId and
+ * stream creation epoch is yet to be provided.
+ * @param streamConfig
+ * @return ShardDetector
+ */
+ @Override
+ public ShardDetector createShardDetector(StreamConfig streamConfig) {
+ return customShardDetectorProvider != null ? customShardDetectorProvider.apply(streamConfig) :
+ new KinesisShardDetector(kinesisClient, streamConfig.streamIdentifier(), listShardsBackoffTimeMillis,
+ maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
+ cacheMissWarningModulus, dynamoDbRequestTimeout);
+ }
+
+ /**
+ * LeaseCleanupManager cleans up leases in the lease table for shards which have either expired past the
+ * stream's retention period or have been completely processed.
+ * @param metricsFactory
+ * @return LeaseCleanupManager
+ */
+ @Override
+ public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
+ return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory),
+ metricsFactory, Executors.newSingleThreadScheduledExecutor(),
+ cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(),
+ leaseCleanupConfig.completedLeaseCleanupIntervalMillis(),
+ leaseCleanupConfig.garbageLeaseCleanupIntervalMillis());
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index 10526ea6..8002eacc 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -14,6 +14,8 @@
*/
package software.amazon.kinesis.leases.dynamodb;
+import com.google.common.collect.ImmutableMap;
+
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -21,24 +23,44 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
-import software.amazon.awssdk.services.dynamodb.model.*;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.TableStatus;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
+import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
-import software.amazon.awssdk.services.dynamodb.model.BillingMode;
/**
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
@@ -58,6 +80,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private boolean newTableCreated = false;
+ private static final String STREAM_NAME = "streamName";
+ private static final String DDB_STREAM_NAME = ":streamName";
+
/**
* Constructor.
*
@@ -264,12 +289,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return System.currentTimeMillis() - startTime;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException,
+ InvalidStateException, ProvisionedThroughputException {
+ return list( null, streamIdentifier);
+ }
+
/**
* {@inheritDoc}
*/
@Override
public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
- return list(null);
+ return list(null, null);
}
/**
@@ -278,22 +312,50 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
@Override
public boolean isLeaseTableEmpty()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
- return list(1).isEmpty();
+ return list(1, 1, null).isEmpty();
}
/**
* List with the given page size. Package access for integration testing.
*
* @param limit number of items to consider at a time - used by integration tests to force paging.
+ * @param streamIdentifier streamIdentifier for multi-stream mode. Can be null.
* @return list of leases
* @throws InvalidStateException if table does not exist
* @throws DependencyException if DynamoDB scan fail in an unexpected way
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
*/
- List list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ List list(Integer limit, StreamIdentifier streamIdentifier)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ return list(limit, Integer.MAX_VALUE, streamIdentifier);
+ }
+
+ /**
+ * List with the given page size. Package access for integration testing.
+ *
+ * @param limit number of items to consider at a time - used by integration tests to force paging.
+ * @param maxPages mad paginated scan calls
+ * @param streamIdentifier streamIdentifier for multi-stream mode. Can be null.
+ * @return list of leases
+ * @throws InvalidStateException if table does not exist
+ * @throws DependencyException if DynamoDB scan fail in an unexpected way
+ * @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
+ */
+ private List list(Integer limit, Integer maxPages, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException,
+ ProvisionedThroughputException {
+
log.debug("Listing leases from table {}", table);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table);
+
+ if (streamIdentifier != null) {
+ final Map expressionAttributeValues = ImmutableMap.of(
+ DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build()
+ );
+ scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME)
+ .expressionAttributeValues(expressionAttributeValues);
+ }
+
if (limit != null) {
scanRequestBuilder = scanRequestBuilder.limit(limit);
}
@@ -315,7 +377,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
Map lastEvaluatedKey = scanResult.lastEvaluatedKey();
- if (CollectionUtils.isNullOrEmpty(lastEvaluatedKey)) {
+ if (CollectionUtils.isNullOrEmpty(lastEvaluatedKey) || --maxPages <= 0) {
// Signify that we're done.
scanResult = null;
log.debug("lastEvaluatedKey was null - scan finished.");
@@ -634,14 +696,40 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return true;
}
+ @Override
+ public void updateLeaseWithMetaInfo(Lease lease, UpdateField updateField)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ log.debug("Updating lease without expectation {}", lease);
+ final AWSExceptionManager exceptionManager = createExceptionManager();
+ exceptionManager.add(ConditionalCheckFailedException.class, t -> t);
+ Map updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
+ UpdateItemRequest request = UpdateItemRequest.builder().tableName(table).key(serializer.getDynamoHashKey(lease))
+ .expected(serializer.getDynamoExistentExpectation(lease.leaseKey()))
+ .attributeUpdates(updates).build();
+ try {
+ try {
+ FutureUtils.resolveOrCancelFuture(dynamoDBClient.updateItem(request), dynamoDbRequestTimeout);
+ } catch (ExecutionException e) {
+ throw exceptionManager.apply(e.getCause());
+ } catch (InterruptedException e) {
+ throw new DependencyException(e);
+ }
+ } catch (ConditionalCheckFailedException e) {
+ log.warn("Lease update failed for lease with key {} because the lease did not exist at the time of the update",
+ lease.leaseKey(), e);
+ } catch (DynamoDbException | TimeoutException e) {
+ throw convertAndRethrowExceptions("update", lease.leaseKey(), e);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@Override
- public ExtendedSequenceNumber getCheckpoint(String shardId)
+ public ExtendedSequenceNumber getCheckpoint(String leaseKey)
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
ExtendedSequenceNumber checkpoint = null;
- Lease lease = getLease(shardId);
+ Lease lease = getLease(leaseKey);
if (lease != null) {
checkpoint = lease.checkpoint();
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java
index 04d987a2..e457b5ec 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java
@@ -36,9 +36,11 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
+import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
@@ -269,7 +271,7 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
* {@inheritDoc}
*/
@Override
- public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String shardId)
+ public boolean updateLease(Lease lease, UUID concurrencyToken, @NonNull String operation, String singleStreamShardId)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
verifyNotNull(lease, "lease cannot be null");
verifyNotNull(lease.leaseKey(), "leaseKey cannot be null");
@@ -296,8 +298,12 @@ public class DynamoDBLeaseRenewer implements LeaseRenewer {
}
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
- if (StringUtils.isNotEmpty(shardId)) {
- MetricsUtil.addShardId(scope, shardId);
+ if (lease instanceof MultiStreamLease) {
+ MetricsUtil.addStreamId(scope,
+ StreamIdentifier.multiStreamInstance(((MultiStreamLease) lease).streamIdentifier()));
+ MetricsUtil.addShardId(scope, ((MultiStreamLease) lease).shardId());
+ } else if (StringUtils.isNotEmpty(singleStreamShardId)) {
+ MetricsUtil.addShardId(scope, singleStreamShardId);
}
long startTime = System.currentTimeMillis();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
index b97738ca..64a7840c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
@@ -30,10 +30,13 @@ import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseSerializer;
+import software.amazon.kinesis.leases.UpdateField;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@@ -50,7 +53,11 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHECKPOINT_SUBSEQUENCE_NUMBER_KEY = "checkpointSubSequenceNumber";
private static final String PENDING_CHECKPOINT_SEQUENCE_KEY = "pendingCheckpoint";
private static final String PENDING_CHECKPOINT_SUBSEQUENCE_KEY = "pendingCheckpointSubSequenceNumber";
+ private static final String PENDING_CHECKPOINT_STATE_KEY = "pendingCheckpointState";
private static final String PARENT_SHARD_ID_KEY = "parentShardId";
+ private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
+ private static final String STARTING_HASH_KEY = "startingHashKey";
+ private static final String ENDING_HASH_KEY = "endingHashKey";
@Override
public Map toDynamoRecord(final Lease lease) {
@@ -69,39 +76,65 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
if (lease.parentShardIds() != null && !lease.parentShardIds().isEmpty()) {
result.put(PARENT_SHARD_ID_KEY, DynamoUtils.createAttributeValue(lease.parentShardIds()));
}
+ if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
+ result.put(CHILD_SHARD_IDS_KEY, DynamoUtils.createAttributeValue(lease.childShardIds()));
+ }
if (lease.pendingCheckpoint() != null && !lease.pendingCheckpoint().sequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().sequenceNumber()));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.pendingCheckpoint().subSequenceNumber()));
}
+ if (lease.pendingCheckpointState() != null) {
+ result.put(PENDING_CHECKPOINT_STATE_KEY, DynamoUtils.createAttributeValue(lease.checkpoint().subSequenceNumber()));
+ }
+
+ if(lease.hashKeyRangeForLease() != null) {
+ result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey()));
+ result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey()));
+ }
+
return result;
}
@Override
public Lease fromDynamoRecord(final Map dynamoRecord) {
- Lease result = new Lease();
- result.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY));
- result.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY));
- result.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY));
+ final Lease result = new Lease();
+ return fromDynamoRecord(dynamoRecord, result);
+ }
- result.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY));
- result.checkpoint(
+ @Override
+ public Lease fromDynamoRecord(Map dynamoRecord, Lease leaseToUpdate) {
+ leaseToUpdate.leaseKey(DynamoUtils.safeGetString(dynamoRecord, LEASE_KEY_KEY));
+ leaseToUpdate.leaseOwner(DynamoUtils.safeGetString(dynamoRecord, LEASE_OWNER_KEY));
+ leaseToUpdate.leaseCounter(DynamoUtils.safeGetLong(dynamoRecord, LEASE_COUNTER_KEY));
+
+ leaseToUpdate.ownerSwitchesSinceCheckpoint(DynamoUtils.safeGetLong(dynamoRecord, OWNER_SWITCHES_KEY));
+ leaseToUpdate.checkpoint(
new ExtendedSequenceNumber(
DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_SEQUENCE_NUMBER_KEY),
DynamoUtils.safeGetLong(dynamoRecord, CHECKPOINT_SUBSEQUENCE_NUMBER_KEY))
);
- result.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
+ leaseToUpdate.parentShardIds(DynamoUtils.safeGetSS(dynamoRecord, PARENT_SHARD_ID_KEY));
+ leaseToUpdate.childShardIds(DynamoUtils.safeGetSS(dynamoRecord, CHILD_SHARD_IDS_KEY));
if (!Strings.isNullOrEmpty(DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY))) {
- result.pendingCheckpoint(
+ leaseToUpdate.pendingCheckpoint(
new ExtendedSequenceNumber(
DynamoUtils.safeGetString(dynamoRecord, PENDING_CHECKPOINT_SEQUENCE_KEY),
DynamoUtils.safeGetLong(dynamoRecord, PENDING_CHECKPOINT_SUBSEQUENCE_KEY))
);
}
- return result;
+ leaseToUpdate.pendingCheckpointState(DynamoUtils.safeGetByteArray(dynamoRecord, PENDING_CHECKPOINT_STATE_KEY));
+
+ final String startingHashKey, endingHashKey;
+ if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
+ && !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
+ leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
+ }
+
+ return leaseToUpdate;
}
@Override
@@ -159,6 +192,19 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
return result;
}
+ @Override
+ public Map getDynamoExistentExpectation(String leaseKey) {
+ Map result = new HashMap<>();
+
+ ExpectedAttributeValue expectedAV = ExpectedAttributeValue.builder()
+ .exists(true)
+ .value(DynamoUtils.createAttributeValue(leaseKey))
+ .build();
+ result.put(LEASE_KEY_KEY, expectedAV);
+
+ return result;
+ }
+
@Override
public Map getDynamoLeaseCounterUpdate(final Lease lease) {
return getDynamoLeaseCounterUpdate(lease.leaseCounter());
@@ -198,7 +244,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
return result;
}
- private AttributeValueUpdate putUpdate(AttributeValue attributeValue) {
+ protected AttributeValueUpdate putUpdate(AttributeValue attributeValue) {
return AttributeValueUpdate.builder().value(attributeValue).action(AttributeAction.PUT).build();
}
@@ -216,6 +262,45 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
}
+
+ if (lease.pendingCheckpointState() != null) {
+ result.put(PENDING_CHECKPOINT_STATE_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.pendingCheckpointState())));
+ } else {
+ result.put(PENDING_CHECKPOINT_STATE_KEY, AttributeValueUpdate.builder().action(AttributeAction.DELETE).build());
+ }
+
+
+ if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
+ result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
+ }
+
+ if(lease.hashKeyRangeForLease() != null) {
+ result.put(STARTING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
+ result.put(ENDING_HASH_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
+ }
+
+ return result;
+ }
+
+ @Override
+ public Map getDynamoUpdateLeaseUpdate(Lease lease,
+ UpdateField updateField) {
+ Map result = new HashMap<>();
+ switch (updateField) {
+ case CHILD_SHARDS:
+ if (!CollectionUtils.isNullOrEmpty(lease.childShardIds())) {
+ result.put(CHILD_SHARD_IDS_KEY, putUpdate(DynamoUtils.createAttributeValue(lease.childShardIds())));
+ }
+ break;
+ case HASH_KEY_RANGE:
+ if (lease.hashKeyRangeForLease() != null) {
+ result.put(STARTING_HASH_KEY, putUpdate(
+ DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedStartingHashKey())));
+ result.put(ENDING_HASH_KEY, putUpdate(
+ DynamoUtils.createAttributeValue(lease.hashKeyRangeForLease().serializedEndingHashKey())));
+ }
+ break;
+ }
return result;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
index d79646e7..4249f32b 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
@@ -25,7 +25,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@@ -36,8 +36,8 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
-import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
+import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
/**
@@ -48,6 +48,7 @@ import software.amazon.kinesis.metrics.MetricsUtil;
public class DynamoDBLeaseTaker implements LeaseTaker {
private static final int TAKE_RETRIES = 3;
private static final int SCAN_RETRIES = 1;
+ private long veryOldLeaseDurationNanosMultiplier = 3;
// See note on TAKE_LEASES_DIMENSION(Callable) for why we have this callable.
private static final Callable SYSTEM_CLOCK_CALLABLE = System::nanoTime;
@@ -95,6 +96,18 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return this;
}
+ /**
+ * Overrides the default very old lease duration nanos multiplier to increase the threshold for taking very old leases.
+ * Setting this to a higher value than 3 will increase the threshold for very old lease taking.
+ *
+ * @param veryOldLeaseDurationNanosMultipler Very old lease duration multiplier for adjusting very old lease taking.
+ * @return LeaseTaker
+ */
+ public DynamoDBLeaseTaker withVeryOldLeaseDurationNanosMultipler(long veryOldLeaseDurationNanosMultipler) {
+ this.veryOldLeaseDurationNanosMultiplier = veryOldLeaseDurationNanosMultipler;
+ return this;
+ }
+
/**
* Max leases to steal from a more loaded Worker at one time (for load balancing).
* Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
@@ -329,31 +342,39 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
Set leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
+ List veryOldLeases = new ArrayList<>();
+
+ int numLeases = 0;
+ int numWorkers = 0;
+ int numLeasesToReachTarget = 0;
+ int leaseSpillover = 0;
+ int veryOldLeaseCount = 0;
try {
- int numLeases = allLeases.size();
- int numWorkers = leaseCounts.size();
+ numLeases = allLeases.size();
+ numWorkers = leaseCounts.size();
if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
}
+
int target;
if (numWorkers >= numLeases) {
// If we have n leases and n or more workers, each worker can have up to 1 lease, including myself.
target = 1;
} else {
- /*
- * numWorkers must be < numLeases.
- *
- * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
- */
+ /*
+ * numWorkers must be < numLeases.
+ *
+ * Our target for each worker is numLeases / numWorkers (+1 if numWorkers doesn't evenly divide numLeases)
+ */
target = numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);
// Spill over is the number of leases this worker should have claimed, but did not because it would
// exceed the max allowed for this worker.
- int leaseSpillover = Math.max(0, target - maxLeasesForWorker);
+ leaseSpillover = Math.max(0, target - maxLeasesForWorker);
if (target > maxLeasesForWorker) {
log.warn(
"Worker {} target is {} leases and maxLeasesForWorker is {}. Resetting target to {},"
@@ -362,11 +383,29 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
workerIdentifier, target, maxLeasesForWorker, maxLeasesForWorker, leaseSpillover);
target = maxLeasesForWorker;
}
- scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
}
int myCount = leaseCounts.get(workerIdentifier);
- int numLeasesToReachTarget = target - myCount;
+ numLeasesToReachTarget = target - myCount;
+
+ int currentLeaseCount = leaseCounts.get(workerIdentifier);
+ // If there are leases that have been expired for an extended period of
+ // time, take them with priority, disregarding the target (computed
+ // later) but obeying the maximum limit per worker.
+ veryOldLeases = allLeases.values().stream()
+ .filter(lease -> System.nanoTime() - lease.lastCounterIncrementNanos()
+ > veryOldLeaseDurationNanosMultiplier * leaseDurationNanos)
+ .collect(Collectors.toList());
+
+ if (!veryOldLeases.isEmpty()) {
+ Collections.shuffle(veryOldLeases);
+ veryOldLeaseCount = Math.max(0, Math.min(maxLeasesForWorker - currentLeaseCount, veryOldLeases.size()));
+ HashSet result = new HashSet<>(veryOldLeases.subList(0, veryOldLeaseCount));
+ if (veryOldLeaseCount > 0) {
+ log.info("Taking leases that have been expired for a long time: {}", result);
+ }
+ return result;
+ }
if (numLeasesToReachTarget <= 0) {
// If we don't need anything, return the empty set.
@@ -376,7 +415,6 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
- int originalExpiredLeasesSize = expiredLeases.size();
if (expiredLeases.size() > 0) {
// If we have expired leases, get up to leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
@@ -397,16 +435,19 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
log.info(
"Worker {} saw {} total leases, {} available leases, {} "
+ "workers. Target is {} leases, I have {} leases, I will take {} leases",
- workerIdentifier, numLeases, originalExpiredLeasesSize, numWorkers, target, myCount,
+ workerIdentifier, numLeases, expiredLeases.size(), numWorkers, target, myCount,
leasesToTake.size());
}
- scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
- scope.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.COUNT, MetricsLevel.SUMMARY);
- scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
- scope.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.COUNT, MetricsLevel.DETAILED);
- scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
} finally {
+ scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
+ scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
+ scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
+ scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
+ scope.addData("NumWorkers", numWorkers, StandardUnit.COUNT, MetricsLevel.SUMMARY);
+ scope.addData("TotalLeases", numLeases, StandardUnit.COUNT, MetricsLevel.DETAILED);
+ scope.addData("VeryOldLeases", veryOldLeaseCount, StandardUnit.COUNT, MetricsLevel.SUMMARY);
+
MetricsUtil.endScope(scope);
}
@@ -499,10 +540,13 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
*/
private Map computeLeaseCounts(List expiredLeases) {
Map leaseCounts = new HashMap<>();
+ // The set will give much faster lookup than the original list, an
+ // important optimization when the list is large
+ Set expiredLeasesSet = new HashSet<>(expiredLeases);
// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
for (Lease lease : allLeases.values()) {
- if (!expiredLeases.contains(lease)) {
+ if (!expiredLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java
new file mode 100644
index 00000000..78c9c6c4
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBMultiStreamLeaseSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.leases.dynamodb;
+
+import lombok.NoArgsConstructor;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.kinesis.leases.DynamoUtils;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.MultiStreamLease;
+
+import java.util.Map;
+
+import static software.amazon.kinesis.leases.MultiStreamLease.validateAndCast;
+
+@NoArgsConstructor
+public class DynamoDBMultiStreamLeaseSerializer extends DynamoDBLeaseSerializer {
+
+ // Keeping the stream id as "streamName" for legacy reasons.
+ private static final String STREAM_ID_KEY = "streamName";
+ private static final String SHARD_ID_KEY = "shardId";
+
+ @Override
+ public Map toDynamoRecord(Lease lease) {
+ final MultiStreamLease multiStreamLease = validateAndCast(lease);
+ final Map result = super.toDynamoRecord(multiStreamLease);
+ result.put(STREAM_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier()));
+ result.put(SHARD_ID_KEY, DynamoUtils.createAttributeValue(multiStreamLease.shardId()));
+ return result;
+ }
+
+ @Override
+ public MultiStreamLease fromDynamoRecord(Map dynamoRecord) {
+ final MultiStreamLease multiStreamLease = (MultiStreamLease) super
+ .fromDynamoRecord(dynamoRecord, new MultiStreamLease());
+ multiStreamLease.streamIdentifier(DynamoUtils.safeGetString(dynamoRecord, STREAM_ID_KEY));
+ multiStreamLease.shardId(DynamoUtils.safeGetString(dynamoRecord, SHARD_ID_KEY));
+ return multiStreamLease;
+ }
+
+
+ @Override
+ public Map getDynamoUpdateLeaseUpdate(Lease lease) {
+ final MultiStreamLease multiStreamLease = validateAndCast(lease);
+ final Map result = super.getDynamoUpdateLeaseUpdate(multiStreamLease);
+ result.put(STREAM_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.streamIdentifier())));
+ result.put(SHARD_ID_KEY, putUpdate(DynamoUtils.createAttributeValue(multiStreamLease.shardId())));
+ return result;
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java
new file mode 100644
index 00000000..ba97ab08
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/CustomerApplicationException.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package software.amazon.kinesis.leases.exceptions;
+
+/**
+ * Exception type for all exceptions thrown by the customer implemented code.
+ */
+public class CustomerApplicationException extends Exception {
+
+ public CustomerApplicationException(Throwable e) { super(e);}
+
+ public CustomerApplicationException(String message, Throwable e) { super(message, e);}
+
+ public CustomerApplicationException(String message) { super(message);}
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java
new file mode 100644
index 00000000..2d3d0c2f
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.leases.exceptions;
+
+import lombok.EqualsAndHashCode;
+import lombok.Value;
+import lombok.experimental.Accessors;
+import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.ShardDetector;
+import software.amazon.kinesis.leases.ShardInfo;
+
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class for cleaning up leases.
+ */
+@Accessors(fluent = true)
+@Value
+@EqualsAndHashCode(exclude = {"queueEntryTime"})
+public class LeasePendingDeletion {
+ private final StreamIdentifier streamIdentifier;
+ private final Lease lease;
+ private final ShardInfo shardInfo;
+ private final ShardDetector shardDetector;
+
+ /**
+ * Discovers the child shards for this lease.
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public Set getChildShardsFromService() throws InterruptedException, ExecutionException, TimeoutException {
+ return shardDetector.getChildShards(shardInfo.shardId()).stream().map(c -> c.shardId()).collect(Collectors.toSet());
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java
index 4e9245f6..f7ec12c5 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java
@@ -19,6 +19,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
@Deprecated
public class ShardSyncer {
private static final HierarchicalShardSyncer HIERARCHICAL_SHARD_SYNCER = new HierarchicalShardSyncer();
+ private static final boolean garbageCollectLeases = true;
/**
* NOTE: This method is deprecated and will be removed in a future release.
@@ -26,7 +27,6 @@ public class ShardSyncer {
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
- * @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
@@ -37,10 +37,9 @@ public class ShardSyncer {
@Deprecated
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
- final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
- final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
- KinesisClientLibIOException {
- HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
- cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
+ final boolean ignoreUnexpectedChildShards, final MetricsScope scope)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
+ HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
+ scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty());
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java
index 4ea4212e..5f1ee18c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTask.java
@@ -54,28 +54,29 @@ public class BlockOnParentShardTask implements ConsumerTask {
@Override
public TaskResult call() {
Exception exception = null;
-
+ final String shardInfoId = ShardInfo.getLeaseKey(shardInfo);
try {
boolean blockedOnParentShard = false;
for (String shardId : shardInfo.parentShardIds()) {
- Lease lease = leaseRefresher.getLease(shardId);
+ final String leaseKey = ShardInfo.getLeaseKey(shardInfo, shardId);
+ final Lease lease = leaseRefresher.getLease(leaseKey);
if (lease != null) {
ExtendedSequenceNumber checkpoint = lease.checkpoint();
if ((checkpoint == null) || (!checkpoint.equals(ExtendedSequenceNumber.SHARD_END))) {
- log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardId, checkpoint);
+ log.debug("Shard {} is not yet done. Its current checkpoint is {}", shardInfoId, checkpoint);
blockedOnParentShard = true;
exception = new BlockedOnParentShardException("Parent shard not yet done");
break;
} else {
- log.debug("Shard {} has been completely processed.", shardId);
+ log.debug("Shard {} has been completely processed.", shardInfoId);
}
} else {
- log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardId);
+ log.info("No lease found for shard {}. Not blocking on completion of this shard.", shardInfoId);
}
}
if (!blockedOnParentShard) {
- log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfo.shardId());
+ log.info("No need to block on parents {} of shard {}", shardInfo.parentShardIds(), shardInfoId);
return new TaskResult(null);
}
} catch (Exception e) {
@@ -85,7 +86,7 @@ public class BlockOnParentShardTask implements ConsumerTask {
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
- log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfo.shardId(), e);
+ log.error("Sleep interrupted when waiting on parent shard(s) of {}", shardInfoId, e);
}
return new TaskResult(exception);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
index bb1788b2..4d894d94 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
@@ -146,7 +146,7 @@ class ConsumerStates {
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
- return ShardConsumerState.SHUTDOWN_COMPLETE.consumerState();
+ return ShardConsumerState.SHUTTING_DOWN.consumerState();
}
@Override
@@ -496,7 +496,10 @@ class ConsumerStates {
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
- argument.metricsFactory());
+ argument.metricsFactory(),
+ input == null ? null : input.childShards(),
+ argument.streamIdentifier(),
+ argument.leaseCleanupManager());
}
@Override
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
index fdb0e947..4108dd9b 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
@@ -21,6 +21,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.Checkpoint;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -75,9 +76,10 @@ public class InitializeTask implements ConsumerTask {
try {
log.debug("Initializing ShardId {}", shardInfo);
- Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.shardId());
+ final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
+ Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey);
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
- log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", shardInfo.shardId(), initialCheckpoint,
+ log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint,
initialPositionInStream);
cache.start(initialCheckpoint, initialPositionInStream);
@@ -90,6 +92,7 @@ public class InitializeTask implements ConsumerTask {
.shardId(shardInfo.shardId())
.extendedSequenceNumber(initialCheckpoint)
.pendingCheckpointSequenceNumber(initialCheckpointObject.pendingCheckpoint())
+ .pendingCheckpointState(initialCheckpointObject.pendingCheckpointState())
.build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
index 6c223650..6c52e0de 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
@@ -23,6 +23,7 @@ import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
+import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -60,6 +61,7 @@ public class ProcessTask implements ConsumerTask {
private final ProcessRecordsInput processRecordsInput;
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
+ private final String shardInfoId;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@@ -74,6 +76,7 @@ public class ProcessTask implements ConsumerTask {
@NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) {
this.shardInfo = shardInfo;
+ this.shardInfoId = ShardInfo.getLeaseKey(shardInfo);
this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis;
@@ -106,6 +109,8 @@ public class ProcessTask implements ConsumerTask {
@Override
public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
+ shardInfo.streamIdentifierSerOpt()
+ .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(scope, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
@@ -121,7 +126,7 @@ public class ProcessTask implements ConsumerTask {
}
if (processRecordsInput.isAtShardEnd() && processRecordsInput.records().isEmpty()) {
- log.info("Reached end of shard {} and have no records to process", shardInfo.shardId());
+ log.info("Reached end of shard {} and have no records to process", shardInfoId);
return new TaskResult(null, true);
}
@@ -142,13 +147,13 @@ public class ProcessTask implements ConsumerTask {
}
success = true;
} catch (RuntimeException e) {
- log.error("ShardId {}: Caught exception: ", shardInfo.shardId(), e);
+ log.error("ShardId {}: Caught exception: ", shardInfoId, e);
exception = e;
backoff();
}
if (processRecordsInput.isAtShardEnd()) {
- log.info("Reached end of shard {}, and processed {} records", shardInfo.shardId(), processRecordsInput.records().size());
+ log.info("Reached end of shard {}, and processed {} records", shardInfoId, processRecordsInput.records().size());
return new TaskResult(null, true);
}
return new TaskResult(exception);
@@ -174,7 +179,7 @@ public class ProcessTask implements ConsumerTask {
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
- log.debug("{}: Sleep was interrupted", shardInfo.shardId(), ie);
+ log.debug("{}: Sleep was interrupted", shardInfoId, ie);
}
}
@@ -188,20 +193,22 @@ public class ProcessTask implements ConsumerTask {
*/
private void callProcessRecords(ProcessRecordsInput input, List records) {
log.debug("Calling application processRecords() with {} records from {}", records.size(),
- shardInfo.shardId());
+ shardInfoId);
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
+ shardInfo.streamIdentifierSerOpt()
+ .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(scope, shardInfo.shardId());
final long startTime = System.currentTimeMillis();
try {
shardRecordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
log.error("ShardId {}: Application processRecords() threw an exception when processing shard ",
- shardInfo.shardId(), e);
- log.error("ShardId {}: Skipping over the following data records: {}", shardInfo.shardId(), records);
+ shardInfoId, e);
+ log.error("ShardId {}: Skipping over the following data records: {}", shardInfoId, records);
} finally {
MetricsUtil.addLatency(scope, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, startTime, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope);
@@ -226,17 +233,17 @@ public class ProcessTask implements ConsumerTask {
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
- log.debug("Kinesis didn't return any records for shard {}", shardInfo.shardId());
+ log.debug("Kinesis didn't return any records for shard {}", shardInfoId);
long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds);
try {
log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis,
- shardInfo.shardId());
+ shardInfoId);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
- log.debug("ShardId {}: Sleep was interrupted", shardInfo.shardId());
+ log.debug("ShardId {}: Sleep was interrupted", shardInfoId);
}
}
}
@@ -273,8 +280,8 @@ public class ProcessTask implements ConsumerTask {
if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
recordIterator.remove();
- log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber,
- lastCheckpointValue);
+ log.debug("{} : removing record with ESN {} because the ESN is <= checkpoint ({})", shardInfoId,
+ extendedSequenceNumber, lastCheckpointValue);
continue;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
index 99a680bf..b6e7c068 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
@@ -16,6 +16,7 @@ package software.amazon.kinesis.lifecycle;
import java.time.Duration;
import java.time.Instant;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -32,6 +33,7 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
@@ -62,6 +64,7 @@ public class ShardConsumer {
private final Function taskMetricsDecorator;
private final int bufferSize;
private final TaskExecutionListener taskExecutionListener;
+ private final String streamIdentifier;
private ConsumerTask currentTask;
private TaskOutcome taskOutcome;
@@ -85,6 +88,8 @@ public class ShardConsumer {
private final ShardConsumerSubscriber subscriber;
+ private ProcessRecordsInput shardEndProcessRecordsInput;
+
@Deprecated
public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo,
Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument,
@@ -124,6 +129,7 @@ public class ShardConsumer {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
+ this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
this.shardConsumerArgument = shardConsumerArgument;
this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
this.taskExecutionListener = taskExecutionListener;
@@ -146,6 +152,7 @@ public class ShardConsumer {
processData(input);
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.SHARD_END);
+ shardEndProcessRecordsInput = input;
subscription.cancel();
return;
}
@@ -208,8 +215,8 @@ public class ShardConsumer {
}
Throwable dispatchFailure = subscriber.getAndResetDispatchFailure();
if (dispatchFailure != null) {
- log.warn("Exception occurred while dispatching incoming data. The incoming data has been skipped",
- dispatchFailure);
+ log.warn("{} : Exception occurred while dispatching incoming data. The incoming data has been skipped",
+ streamIdentifier, dispatchFailure);
return dispatchFailure;
}
@@ -238,7 +245,7 @@ public class ShardConsumer {
Instant now = Instant.now();
Duration timeSince = Duration.between(subscriber.lastDataArrival(), now);
if (timeSince.toMillis() > value) {
- log.warn("Last time data arrived: {} ({})", lastDataArrival, timeSince);
+ log.warn("{} : Last time data arrived: {} ({})", streamIdentifier, lastDataArrival, timeSince);
}
}
});
@@ -250,11 +257,11 @@ public class ShardConsumer {
if (taken != null) {
String message = longRunningTaskMessage(taken);
if (log.isDebugEnabled()) {
- log.debug("{} Not submitting new task.", message);
+ log.debug("{} : {} Not submitting new task.", streamIdentifier, message);
}
logWarningForTaskAfterMillis.ifPresent(value -> {
if (taken.toMillis() > value) {
- log.warn(message);
+ log.warn("{} : {}", streamIdentifier, message);
}
});
}
@@ -303,7 +310,7 @@ public class ShardConsumer {
return true;
}
- executeTask(null);
+ executeTask(shardEndProcessRecordsInput);
return false;
}
}, executorService);
@@ -358,7 +365,7 @@ public class ShardConsumer {
nextState = currentState.failureTransition();
break;
default:
- log.error("No handler for outcome of {}", outcome.name());
+ log.error("{} : No handler for outcome of {}", streamIdentifier, outcome.name());
nextState = currentState.failureTransition();
break;
}
@@ -382,9 +389,9 @@ public class ShardConsumer {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
- log.debug("Shard {} is blocked on completion of parent shard.", shardInfo.shardId());
+ log.debug("{} : Shard {} is blocked on completion of parent shard.", streamIdentifier, shardInfo.shardId());
} else {
- log.debug("Caught exception running {} task: ", currentTask.taskType(), taskResult.getException());
+ log.debug("{} : Caught exception running {} task: ", streamIdentifier, currentTask.taskType(), taskResult.getException());
}
}
}
@@ -411,10 +418,10 @@ public class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
public boolean leaseLost() {
- log.debug("Shutdown({}): Lease lost triggered.", shardInfo.shardId());
+ log.debug("{} : Shutdown({}): Lease lost triggered.", streamIdentifier, shardInfo.shardId());
if (subscriber != null) {
subscriber.cancel();
- log.debug("Shutdown({}): Subscriber cancelled.", shardInfo.shardId());
+ log.debug("{} : Shutdown({}): Subscriber cancelled.", streamIdentifier, shardInfo.shardId());
}
markForShutdown(ShutdownReason.LEASE_LOST);
return isShutdown();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
index 4f1db733..0f18891c 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
@@ -21,8 +21,9 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
-import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@@ -41,7 +42,7 @@ public class ShardConsumerArgument {
@NonNull
private final ShardInfo shardInfo;
@NonNull
- private final String streamName;
+ private final StreamIdentifier streamIdentifier;
@NonNull
private final LeaseCoordinator leaseCoordinator;
@NonNull
@@ -71,4 +72,5 @@ public class ShardConsumerArgument {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
+ private final LeaseCleanupManager leaseCleanupManager;
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
index cb70024d..685a76d2 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.java
@@ -24,6 +24,7 @@ import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@@ -40,8 +41,8 @@ class ShardConsumerSubscriber implements Subscriber {
private final int bufferSize;
private final ShardConsumer shardConsumer;
private final int readTimeoutsToIgnoreBeforeWarning;
+ private final String shardInfoId;
private volatile int readTimeoutSinceLastRead = 0;
-
@VisibleForTesting
final Object lockObject = new Object();
// This holds the last time an attempt of request to upstream service was made including the first try to
@@ -70,6 +71,7 @@ class ShardConsumerSubscriber implements Subscriber