discoverNewLeases() throws ProvisionedThroughputException, InvalidStateException, DependencyException;
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 2d4e041c..77a45a04 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -16,7 +16,9 @@
package software.amazon.kinesis.leases;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -25,7 +27,9 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Builder;
import lombok.Data;
+import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
@@ -34,6 +38,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
@@ -42,6 +47,7 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
+import software.amazon.kinesis.worker.metric.WorkerMetric;
/**
* Used by the KCL to configure lease management.
@@ -209,6 +215,9 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
+ private WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig =
+ new WorkerUtilizationAwareAssignmentConfig();
+
/**
* Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update
* already existing tables.
@@ -276,14 +285,17 @@ public class LeaseManagementConfig {
}
public LeaseManagementConfig(
- String tableName,
- DynamoDbAsyncClient dynamoDBClient,
- KinesisAsyncClient kinesisClient,
- String workerIdentifier) {
+ final String tableName,
+ final String applicationName,
+ final DynamoDbAsyncClient dynamoDBClient,
+ final KinesisAsyncClient kinesisClient,
+ final String workerIdentifier) {
this.tableName = tableName;
this.dynamoDBClient = dynamoDBClient;
this.kinesisClient = kinesisClient;
this.workerIdentifier = workerIdentifier;
+ this.workerUtilizationAwareAssignmentConfig.workerMetricsTableConfig =
+ new WorkerMetricsTableConfig(applicationName);
}
/**
@@ -361,6 +373,53 @@ public class LeaseManagementConfig {
return hierarchicalShardSyncer;
}
+ /**
+ * Configuration class for controlling the graceful handoff of leases.
+ * This configuration allows tuning of the shutdown behavior during lease transfers.
+ *
+ * It provides settings to control the timeout period for waiting on the record processor
+ * to shut down and an option to enable or disable graceful lease handoff.
+ *
+ */
+ @Builder
+ @Getter
+ @Accessors(fluent = true)
+ public static class GracefulLeaseHandoffConfig {
+ /**
+ * The minimum amount of time (in milliseconds) to wait for the current shard's RecordProcessor
+ * to gracefully shut down before forcefully transferring the lease to the next owner.
+ *
+ * If each call to {@code processRecords} is expected to run longer than the default value,
+ * it makes sense to set this to a higher value to ensure the RecordProcessor has enough
+ * time to complete its processing.
+ *
+ *
+ * Default value is 30,000 milliseconds (30 seconds).
+ *
+ */
+ @Builder.Default
+ private long gracefulLeaseHandoffTimeoutMillis = 30_000L;
+ /**
+ * Flag to enable or disable the graceful lease handoff mechanism.
+ *
+ * When set to {@code true}, the KCL will attempt to gracefully transfer leases by
+ * allowing the shard's RecordProcessor sufficient time to complete processing before
+ * handing off the lease to another worker. When {@code false}, the lease will be
+ * handed off without waiting for the RecordProcessor to shut down gracefully. Note
+ * that checkpointing is expected to be implemented inside {@code shutdownRequested}
+ * for this feature to work end to end.
+ *
+ *
+ * Default value is {@code true}.
+ *
+ */
+ @Builder.Default
+ private boolean isGracefulLeaseHandoffEnabled = true;
+ }
+
+ private GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
+ GracefulLeaseHandoffConfig.builder().build();
+
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
@@ -440,7 +499,9 @@ public class LeaseManagementConfig {
leaseSerializer,
customShardDetectorProvider(),
isMultiStreamingMode,
- leaseCleanupConfig());
+ leaseCleanupConfig(),
+ workerUtilizationAwareAssignmentConfig(),
+ gracefulLeaseHandoffConfig);
}
return leaseManagementFactory;
}
@@ -454,4 +515,89 @@ public class LeaseManagementConfig {
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
+
+ @Data
+ @Accessors(fluent = true)
+ public static class WorkerUtilizationAwareAssignmentConfig {
+ /**
+ * This defines the frequency of capturing worker metric stats in memory. Default is 1s
+ */
+ private long inMemoryWorkerMetricsCaptureFrequencyMillis =
+ Duration.ofSeconds(1L).toMillis();
+ /**
+ * This defines the frequency of reporting worker metric stats to storage. Default is 30s
+ */
+ private long workerMetricsReporterFreqInMillis = Duration.ofSeconds(30).toMillis();
+ /**
+ * These are the no. of metrics that are persisted in storage in WorkerMetricStats ddb table.
+ */
+ private int noOfPersistedMetricsPerWorkerMetrics = 10;
+ /**
+ * Option to disable workerMetrics to use in lease balancing.
+ */
+ private boolean disableWorkerMetrics = false;
+ /**
+ * List of workerMetrics for the application.
+ */
+ private List workerMetricList = new ArrayList<>();
+ /**
+ * Max throughput per host KBps, default is unlimited.
+ */
+ private double maxThroughputPerHostKBps = Double.MAX_VALUE;
+ /**
+ * Percentage of value to achieve critical dampening during this case
+ */
+ private int dampeningPercentage = 60;
+ /**
+ * Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less
+ * than 20% of fleet level average then reBalance is triggered.
+ * Leases are taken from workers with metrics value more than fleet level average. The load to take from these
+ * workers is determined by evaluating how far they are with respect to fleet level average.
+ */
+ private int reBalanceThresholdPercentage = 10;
+
+ /**
+ * The allowThroughputOvershoot flag determines whether leases should still be taken even if
+ * it causes the total assigned throughput to exceed the desired throughput to take for re-balance.
+ * Enabling this flag provides more flexibility for the LeaseAssignmentManager to explore additional
+ * assignment possibilities, which can lead to faster throughput convergence.
+ */
+ private boolean allowThroughputOvershoot = true;
+
+ /**
+ * Duration after which workerMetrics entry from WorkerMetricStats table will be cleaned up. When an entry's
+ * lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed
+ * from the table.
+ */
+ private Duration staleWorkerMetricsEntryCleanupDuration = Duration.ofDays(1);
+
+ /**
+ * configuration to configure how to create the WorkerMetricStats table, such as table name,
+ * billing mode, provisioned capacity. If no table name is specified, the table name will
+ * default to applicationName-WorkerMetricStats. If no billing more is chosen, default is
+ * On-Demand.
+ */
+ private WorkerMetricsTableConfig workerMetricsTableConfig;
+
+ /**
+ * Frequency to perform worker variance balancing frequency. This value is used with respect to the LAM freq,
+ * that is every third (as default) iteration of LAM the worker variance balancing will be performed.
+ * Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration
+ * and so on.
+ */
+ private int varianceBalancingFrequency = 3;
+
+ /**
+ * Alpha value used for calculating exponential moving average of worker's metrics values. Selecting
+ * higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average
+ * and selecting smaller alpha values gives more weightage to past value and high smoothing effect.
+ */
+ private double workerMetricsEMAAlpha = 0.5;
+ }
+
+ public static class WorkerMetricsTableConfig extends DdbTableConfig {
+ public WorkerMetricsTableConfig(final String applicationName) {
+ super(applicationName, "WorkerMetricStats");
+ }
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
index 9ed77a53..6f786bc9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
@@ -15,9 +15,12 @@
package software.amazon.kinesis.leases;
+import java.util.concurrent.ConcurrentMap;
+
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
@@ -26,6 +29,11 @@ import software.amazon.kinesis.metrics.MetricsFactory;
public interface LeaseManagementFactory {
LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory);
+ default LeaseCoordinator createLeaseCoordinator(
+ MetricsFactory metricsFactory, ConcurrentMap shardInfoShardConsumerMap) {
+ throw new UnsupportedOperationException();
+ }
+
ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory);
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
index c38d442a..fc71621d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
@@ -15,6 +15,9 @@
package software.amazon.kinesis.leases;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@@ -75,6 +78,37 @@ public interface LeaseRefresher {
*/
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
+ /**
+ * Creates the LeaseOwnerToLeaseKey index on the lease table if it doesn't exist and returns the status of index.
+ *
+ * @return indexStatus status of the index.
+ * @throws DependencyException if storage's describe API fails in an unexpected way
+ */
+ default String createLeaseOwnerToLeaseKeyIndexIfNotExists() throws DependencyException {
+ return null;
+ }
+
+ /**
+ * Blocks until the index exists by polling storage till either the index is ACTIVE or else timeout has
+ * happened.
+ *
+ * @param secondsBetweenPolls time to wait between polls in seconds
+ * @param timeoutSeconds total time to wait in seconds
+ *
+ * @return true if index on the table exists and is ACTIVE, false if timeout was reached
+ */
+ default boolean waitUntilLeaseOwnerToLeaseKeyIndexExists(
+ final long secondsBetweenPolls, final long timeoutSeconds) {
+ return false;
+ }
+
+ /**
+ * Check if leaseOwner GSI is ACTIVE
+ * @return true if index is active, false otherwise
+ * @throws DependencyException if storage's describe API fails in an unexpected way
+ */
+ boolean isLeaseOwnerToLeaseKeyIndexActive() throws DependencyException;
+
/**
* List all leases for a given stream synchronously.
*
@@ -87,6 +121,24 @@ public interface LeaseRefresher {
List listLeasesForStream(StreamIdentifier streamIdentifier)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * List all leases for a given workerIdentifier synchronously.
+ * Default implementation calls listLeases() and filters the results.
+ *
+ * @throws DependencyException if DynamoDB scan fails in an unexpected way
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
+ *
+ * @return list of leases
+ */
+ default List listLeaseKeysForWorker(final String workerIdentifier)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ return listLeases().stream()
+ .filter(lease -> lease.leaseOwner().equals(workerIdentifier))
+ .map(Lease::leaseKey)
+ .collect(Collectors.toList());
+ }
+
/**
* List all objects in table synchronously.
*
@@ -98,6 +150,23 @@ public interface LeaseRefresher {
*/
List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
+ * that failed deserialize separately.
+ *
+ * @param threadPool threadpool to use for parallel scan
+ * @param parallelismFactor no. of parallel scans
+ * @return Pair of List of leases from the storage and List of items failed to deserialize
+ * @throws DependencyException if DynamoDB scan fails in an unexpected way
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
+ */
+ default Map.Entry, List> listLeasesParallely(
+ final ExecutorService threadPool, final int parallelismFactor)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throw new UnsupportedOperationException("listLeasesParallely is not implemented");
+ }
+
/**
* Create a new lease. Conditional on a lease not already existing with this shardId.
*
@@ -154,6 +223,47 @@ public interface LeaseRefresher {
boolean takeLease(Lease lease, String owner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * Assigns given lease to newOwner owner by incrementing its leaseCounter and setting its owner field. Conditional
+ * on the leaseOwner in DynamoDB matching the leaseOwner of the input lease. Mutates the leaseCounter and owner of
+ * the passed-in lease object after updating DynamoDB.
+ *
+ * @param lease the lease to be assigned
+ * @param newOwner the new owner
+ *
+ * @return true if lease was successfully assigned, false otherwise
+ *
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
+ * @throws DependencyException if DynamoDB update fails in an unexpected way
+ */
+ default boolean assignLease(final Lease lease, final String newOwner)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+
+ throw new UnsupportedOperationException("assignLease is not implemented");
+ }
+
+ /**
+ * Initiates a graceful handoff of the given lease to the specified new owner, allowing the current owner
+ * to complete its processing before transferring ownership.
+ *
+ * This method updates the lease with the new owner information but ensures that the current owner
+ * is given time to gracefully finish its work (e.g., processing records) before the lease is reassigned.
+ *
+ *
+ * @param lease the lease to be assigned
+ * @param newOwner the new owner
+ * @return true if a graceful handoff was successfully initiated
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
+ * @throws DependencyException if DynamoDB update fails in an unexpected way
+ */
+ default boolean initiateGracefulLeaseHandoff(final Lease lease, final String newOwner)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+
+ throw new UnsupportedOperationException("assignLeaseWithWait is not implemented");
+ }
+
/**
* Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
index 5d7bea63..3c4692a9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
@@ -100,6 +101,15 @@ public interface LeaseSerializer {
*/
Map getDynamoTakeLeaseUpdate(Lease lease, String newOwner);
+ /**
+ * @param lease lease that needs to be assigned
+ * @param newOwner newLeaseOwner
+ * @return the attribute value map that takes a lease for a new owner
+ */
+ default Map getDynamoAssignLeaseUpdate(Lease lease, String newOwner) {
+ throw new UnsupportedOperationException("getDynamoAssignLeaseUpdate is not implemented");
+ }
+
/**
* @param lease
* @return the attribute value map that voids a lease
@@ -127,8 +137,22 @@ public interface LeaseSerializer {
*/
Collection getKeySchema();
+ default Collection getWorkerIdToLeaseKeyIndexKeySchema() {
+ return Collections.EMPTY_LIST;
+ }
+
+ default Collection getWorkerIdToLeaseKeyIndexAttributeDefinitions() {
+ return Collections.EMPTY_LIST;
+ }
+
/**
* @return attribute definitions for creating a DynamoDB table to store leases
*/
Collection getAttributeDefinitions();
+
+ /**
+ * @param lease
+ * @return the attribute value map that includes lease throughput
+ */
+ Map getDynamoLeaseThroughputKbpsUpdate(Lease lease);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java
new file mode 100644
index 00000000..dcb5d6de
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java
@@ -0,0 +1,158 @@
+package software.amazon.kinesis.leases;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import software.amazon.awssdk.annotations.ThreadSafe;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.utils.ExponentialMovingAverage;
+
+import static java.util.Objects.isNull;
+
+/**
+ * This class records the stats for the leases.
+ * The stats are recorded in a thread safe queue, and the throughput is calculated by summing up the bytes and dividing
+ * by interval in seconds.
+ * This class is thread safe and backed by thread safe data structures.
+ */
+@RequiredArgsConstructor
+@KinesisClientInternalApi
+@ThreadSafe
+public class LeaseStatsRecorder {
+
+ /**
+ * This default alpha is chosen based on the testing so far between simple average and moving average with 0.5.
+ * In the future, if one value does not fit all use cases, inject this via config.
+ */
+ private static final double DEFAULT_ALPHA = 0.5;
+
+ public static final int BYTES_PER_KB = 1024;
+
+ private final Long renewerFrequencyInMillis;
+ private final Map> leaseStatsMap = new ConcurrentHashMap<>();
+ private final Map leaseKeyToExponentialMovingAverageMap =
+ new ConcurrentHashMap<>();
+ private final Callable timeProviderInMillis;
+
+ /**
+ * This method provides happens-before semantics (i.e., the action (access or removal) from a thread happens
+ * before the action from subsequent thread) for the stats recording in multithreaded environment.
+ */
+ public void recordStats(@NonNull final LeaseStats leaseStats) {
+ final Queue leaseStatsQueue =
+ leaseStatsMap.computeIfAbsent(leaseStats.getLeaseKey(), lease -> new ConcurrentLinkedQueue<>());
+ leaseStatsQueue.add(leaseStats);
+ }
+
+ /**
+ * Calculates the throughput in KBps for the given leaseKey.
+ * Method first clears the items that are older than {@link #renewerFrequencyInMillis} from the queue and then
+ * calculates the throughput per second during {@link #renewerFrequencyInMillis} interval and then returns the
+ * ExponentialMovingAverage of the throughput. If method is called in quick succession with or without new stats
+ * the result can be different as ExponentialMovingAverage decays old values on every new call.
+ * This method is thread safe.
+ * @param leaseKey leaseKey for which stats are required
+ * @return throughput in Kbps, returns null if there is no stats available for the leaseKey.
+ */
+ public Double getThroughputKBps(final String leaseKey) {
+ final Queue leaseStatsQueue = leaseStatsMap.get(leaseKey);
+
+ if (isNull(leaseStatsQueue)) {
+ // This means there is no entry for this leaseKey yet
+ return null;
+ }
+
+ filterExpiredEntries(leaseStatsQueue);
+
+ // Convert bytes into KB and divide by interval in second to get throughput per second.
+ final ExponentialMovingAverage exponentialMovingAverage = leaseKeyToExponentialMovingAverageMap.computeIfAbsent(
+ leaseKey, leaseId -> new ExponentialMovingAverage(DEFAULT_ALPHA));
+
+ // Specifically dividing by 1000.0 rather than using Duration class to get seconds, because Duration class
+ // implementation rounds off to seconds and precision is lost.
+ final double frequency = renewerFrequencyInMillis / 1000.0;
+ final double throughput = readQueue(leaseStatsQueue).stream()
+ .mapToDouble(LeaseStats::getBytes)
+ .sum()
+ / BYTES_PER_KB
+ / frequency;
+ exponentialMovingAverage.add(throughput);
+ return exponentialMovingAverage.getValue();
+ }
+
+ /**
+ * Gets the currentTimeMillis and then iterates over the queue to get the stats with creation time less than
+ * currentTimeMillis.
+ * This is specifically done to avoid potential race between with high-frequency put thread blocking get thread.
+ */
+ private Queue readQueue(final Queue leaseStatsQueue) {
+ final long currentTimeMillis = getCurrenTimeInMillis();
+ final Queue response = new LinkedList<>();
+ for (LeaseStats leaseStats : leaseStatsQueue) {
+ if (leaseStats.creationTimeMillis > currentTimeMillis) {
+ break;
+ }
+ response.add(leaseStats);
+ }
+ return response;
+ }
+
+ private long getCurrenTimeInMillis() {
+ try {
+ return timeProviderInMillis.call();
+ } catch (final Exception e) {
+ // Fallback to using the System.currentTimeMillis if failed.
+ return System.currentTimeMillis();
+ }
+ }
+
+ private void filterExpiredEntries(final Queue leaseStatsQueue) {
+ final long currentTime = getCurrenTimeInMillis();
+ while (!leaseStatsQueue.isEmpty()) {
+ final LeaseStats leaseStats = leaseStatsQueue.peek();
+ if (isNull(leaseStats) || currentTime - leaseStats.getCreationTimeMillis() < renewerFrequencyInMillis) {
+ break;
+ }
+ leaseStatsQueue.poll();
+ }
+ }
+
+ /**
+ * Clear the in-memory stats for the lease when a lease is reassigned (due to shut down or lease stealing)
+ * @param leaseKey leaseKey, for which stats are supposed to be clear.
+ */
+ public void dropLeaseStats(final String leaseKey) {
+ leaseStatsMap.remove(leaseKey);
+ leaseKeyToExponentialMovingAverageMap.remove(leaseKey);
+ }
+
+ @Builder
+ @Getter
+ @ToString
+ @KinesisClientInternalApi
+ public static final class LeaseStats {
+ /**
+ * Lease key for which this leaseStats object is created.
+ */
+ private final String leaseKey;
+ /**
+ * Bytes that are processed for a lease
+ */
+ private final long bytes;
+ /**
+ * Wall time in epoch millis at which this leaseStats object was created. This time is used to determine the
+ * expiry of the lease stats.
+ */
+ @Builder.Default
+ private final long creationTimeMillis = System.currentTimeMillis();
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index bef76ef0..7eb4c4f1 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
@@ -30,13 +31,17 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.coordinator.MigrationAdaptiveLeaseAssignmentModeProvider;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
+import software.amazon.kinesis.leases.LeaseDiscoverer;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
+import software.amazon.kinesis.leases.LeaseStatsRecorder;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
@@ -44,6 +49,8 @@ import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+import software.amazon.kinesis.lifecycle.LeaseGracefulShutdownHandler;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
@@ -70,115 +77,34 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.setNameFormat("LeaseRenewer-%04d")
.setDaemon(true)
.build();
+ private static final ThreadFactory LEASE_DISCOVERY_THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("LeaseDiscovery-%04d")
+ .setDaemon(true)
+ .build();
private final LeaseRenewer leaseRenewer;
private final LeaseTaker leaseTaker;
+ private final LeaseDiscoverer leaseDiscoverer;
private final long renewerIntervalMillis;
private final long takerIntervalMillis;
+ private final long leaseDiscovererIntervalMillis;
private final ExecutorService leaseRenewalThreadpool;
+ private final ExecutorService leaseDiscoveryThreadPool;
private final LeaseRefresher leaseRefresher;
+ private final LeaseStatsRecorder leaseStatsRecorder;
+ private final LeaseGracefulShutdownHandler leaseGracefulShutdownHandler;
private long initialLeaseTableReadCapacity;
private long initialLeaseTableWriteCapacity;
protected final MetricsFactory metricsFactory;
private final Object shutdownLock = new Object();
-
+ private final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig;
private ScheduledExecutorService leaseCoordinatorThreadPool;
+ private ScheduledFuture> leaseDiscoveryFuture;
private ScheduledFuture> takerFuture;
private volatile boolean running = false;
- /**
- * Constructor.
- *
- * NOTE: This constructor is deprecated and will be removed in a future release.
- *
- * @param leaseRefresher
- * LeaseRefresher instance to use
- * @param workerIdentifier
- * Identifies the worker (e.g. useful to track lease ownership)
- * @param leaseDurationMillis
- * Duration of a lease
- * @param epsilonMillis
- * Allow for some variance when calculating lease expirations
- * @param maxLeasesForWorker
- * Max leases this Worker can handle at a time
- * @param maxLeasesToStealAtOneTime
- * Steal up to these many leases at a time (for load balancing)
- * @param metricsFactory
- * Used to publish metrics about lease operations
- */
- @Deprecated
- public DynamoDBLeaseCoordinator(
- final LeaseRefresher leaseRefresher,
- final String workerIdentifier,
- final long leaseDurationMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewerThreadCount,
- final MetricsFactory metricsFactory) {
- this(
- leaseRefresher,
- workerIdentifier,
- leaseDurationMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewerThreadCount,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
- metricsFactory);
- }
-
- /**
- * Constructor.
- *
- * @param leaseRefresher
- * LeaseRefresher instance to use
- * @param workerIdentifier
- * Identifies the worker (e.g. useful to track lease ownership)
- * @param leaseDurationMillis
- * Duration of a lease
- * @param epsilonMillis
- * Allow for some variance when calculating lease expirations
- * @param maxLeasesForWorker
- * Max leases this Worker can handle at a time
- * @param maxLeasesToStealAtOneTime
- * Steal up to these many leases at a time (for load balancing)
- * @param initialLeaseTableReadCapacity
- * Initial dynamodb lease table read iops if creating the lease table
- * @param initialLeaseTableWriteCapacity
- * Initial dynamodb lease table write iops if creating the lease table
- * @param metricsFactory
- * Used to publish metrics about lease operations
- */
- @Deprecated
- public DynamoDBLeaseCoordinator(
- final LeaseRefresher leaseRefresher,
- final String workerIdentifier,
- final long leaseDurationMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewerThreadCount,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final MetricsFactory metricsFactory) {
- this(
- leaseRefresher,
- workerIdentifier,
- leaseDurationMillis,
- LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewerThreadCount,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
- metricsFactory);
- }
-
/**
* Constructor.
*
@@ -214,17 +140,35 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
- final MetricsFactory metricsFactory) {
+ final MetricsFactory metricsFactory,
+ final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
+ final ConcurrentMap shardInfoShardConsumerMap) {
this.leaseRefresher = leaseRefresher;
- this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
+ this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
- this.leaseRenewer = new DynamoDBLeaseRenewer(
- leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
+ // Should run once every leaseDurationMillis to identify new leases before expiry.
+ this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
+ this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
+ this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
+ gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);
+ this.leaseRenewer = new DynamoDBLeaseRenewer(
+ leaseRefresher,
+ workerIdentifier,
+ leaseDurationMillis,
+ leaseRenewalThreadpool,
+ metricsFactory,
+ leaseStatsRecorder,
+ leaseGracefulShutdownHandler::enqueueShutdown);
+ this.leaseDiscoveryThreadPool =
+ createExecutorService(maxLeaseRenewerThreadCount, LEASE_DISCOVERY_THREAD_FACTORY);
+ this.leaseDiscoverer = new DynamoDBLeaseDiscoverer(
+ this.leaseRefresher, this.leaseRenewer, metricsFactory, workerIdentifier, leaseDiscoveryThreadPool);
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
@@ -234,6 +178,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
}
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.metricsFactory = metricsFactory;
+ this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
log.info(
"With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take"
@@ -246,11 +191,49 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
maxLeasesToStealAtOneTime);
}
- private class TakerRunnable implements Runnable {
+ @RequiredArgsConstructor
+ private class LeaseDiscoveryRunnable implements Runnable {
+ private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;
@Override
public void run() {
try {
+ // LeaseDiscoverer is run in WORKER_UTILIZATION_AWARE_ASSIGNMENT mode only
+ synchronized (shutdownLock) {
+ if (!leaseAssignmentModeProvider
+ .getLeaseAssignmentMode()
+ .equals(
+ MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode
+ .WORKER_UTILIZATION_AWARE_ASSIGNMENT)) {
+ return;
+ }
+ if (running) {
+ leaseRenewer.addLeasesToRenew(leaseDiscoverer.discoverNewLeases());
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to execute lease discovery", e);
+ }
+ }
+ }
+
+ @RequiredArgsConstructor
+ private class TakerRunnable implements Runnable {
+ private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;
+
+ @Override
+ public void run() {
+ try {
+ // LeaseTaker is run in DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT mode only
+ synchronized (shutdownLock) {
+ if (!leaseAssignmentModeProvider
+ .getLeaseAssignmentMode()
+ .equals(
+ MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode
+ .DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT)) {
+ return;
+ }
+ }
runLeaseTaker();
} catch (LeasingException e) {
log.error("LeasingException encountered in lease taking thread", e);
@@ -290,18 +273,35 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
}
@Override
- public void start() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ public void start(final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
leaseRenewer.initialize();
+ // At max, we need 3 threads - lease renewer, lease taker, lease discoverer - to run without contention.
+ leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(3, LEASE_COORDINATOR_THREAD_FACTORY);
- // 2 because we know we'll have at most 2 concurrent tasks at a time.
- leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
+ // During migration to KCLv3.x from KCLv2.x, lease assignment mode can change dynamically, so
+ // both lease assignment algorithms will be started but only one will execute based on
+ // leaseAssignmentModeProvider.getLeaseAssignmentMode(). However for new applications starting in
+ // KCLv3.x or applications successfully migrated to KCLv3.x, lease assignment mode will not
+ // change dynamically and will always be WORKER_UTILIZATION_AWARE_ASSIGNMENT, therefore
+ // don't initialize KCLv2.x lease assignment algorithm components that are not needed.
+ if (leaseAssignmentModeProvider.dynamicModeChangeSupportNeeded()) {
+ // Taker runs with fixed DELAY because we want it to run slower in the event of performance degradation.
+ takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(
+ new TakerRunnable(leaseAssignmentModeProvider), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
+ }
- // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
- takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(
- new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
- // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
+ leaseDiscoveryFuture = leaseCoordinatorThreadPool.scheduleAtFixedRate(
+ new LeaseDiscoveryRunnable(leaseAssignmentModeProvider),
+ 0L,
+ leaseDiscovererIntervalMillis,
+ TimeUnit.MILLISECONDS);
+
+ // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degradation.
leaseCoordinatorThreadPool.scheduleAtFixedRate(
new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
+
+ leaseGracefulShutdownHandler.start();
running = true;
}
@@ -383,6 +383,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
}
leaseRenewalThreadpool.shutdownNow();
+ leaseCoordinatorThreadPool.shutdownNow();
+ leaseGracefulShutdownHandler.stop();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();
running = false;
@@ -393,6 +395,10 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
public void stopLeaseTaker() {
if (takerFuture != null) {
takerFuture.cancel(false);
+ leaseDiscoveryFuture.cancel(false);
+ // the method is called in worker graceful shutdown. We want to stop any further lease shutdown
+ // so we don't interrupt worker shutdown.
+ leaseGracefulShutdownHandler.stop();
}
}
@@ -418,20 +424,15 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
}
/**
- * Returns executor service that should be used for lease renewal.
+ * Returns executor service for given ThreadFactory.
* @param maximumPoolSize Maximum allowed thread pool size
- * @return Executor service that should be used for lease renewal.
+ * @return Executor service
*/
- private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
+ private static ExecutorService createExecutorService(final int maximumPoolSize, final ThreadFactory threadFactory) {
int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(
- coreLeaseCount,
- maximumPoolSize,
- 60,
- TimeUnit.SECONDS,
- new LinkedTransferQueue<>(),
- LEASE_RENEWAL_THREAD_FACTORY);
+ coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, new LinkedTransferQueue<>(), threadFactory);
}
@Override
@@ -472,6 +473,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* {@inheritDoc}
*
* NOTE: This method is deprecated. Please set the initial capacity through the constructor.
+ *
+ * This is a method of the public lease coordinator interface.
*/
@Override
@Deprecated
@@ -487,6 +490,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* {@inheritDoc}
*
* NOTE: This method is deprecated. Please set the initial capacity through the constructor.
+ *
+ * This is a method of the public lease coordinator interface.
*/
@Override
@Deprecated
@@ -497,4 +502,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
initialLeaseTableWriteCapacity = writeCapacity;
return this;
}
+
+ @Override
+ public LeaseStatsRecorder leaseStatsRecorder() {
+ return leaseStatsRecorder;
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java
new file mode 100644
index 00000000..ce5605ee
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java
@@ -0,0 +1,120 @@
+package software.amazon.kinesis.leases.dynamodb;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.LeaseDiscoverer;
+import software.amazon.kinesis.leases.LeaseRefresher;
+import software.amazon.kinesis.leases.LeaseRenewer;
+import software.amazon.kinesis.leases.exceptions.DependencyException;
+import software.amazon.kinesis.leases.exceptions.InvalidStateException;
+import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.metrics.MetricsLevel;
+import software.amazon.kinesis.metrics.MetricsScope;
+import software.amazon.kinesis.metrics.MetricsUtil;
+
+import static java.util.Objects.isNull;
+
+/**
+ * An implementation of {@link LeaseDiscoverer}, it uses {@link LeaseRefresher} to query
+ * {@link DynamoDBLeaseRefresher#LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME } and find the leases assigned
+ * to current worker and then filter and returns the leases that have not started processing (looks at
+ * {@link LeaseRenewer#getCurrentlyHeldLeases()} to find out which leases are currently held leases).
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class DynamoDBLeaseDiscoverer implements LeaseDiscoverer {
+
+ private final LeaseRefresher leaseRefresher;
+ private final LeaseRenewer leaseRenewer;
+ private final MetricsFactory metricsFactory;
+ private final String workerIdentifier;
+ private final ExecutorService executorService;
+
+ @Override
+ public List discoverNewLeases()
+ throws ProvisionedThroughputException, InvalidStateException, DependencyException {
+ final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, "LeaseDiscovery");
+ long startTime = System.currentTimeMillis();
+ boolean success = false;
+ try {
+ final Set currentHeldLeaseKeys =
+ leaseRenewer.getCurrentlyHeldLeases().keySet();
+
+ final long listLeaseKeysForWorkerStartTime = System.currentTimeMillis();
+ final List leaseKeys = leaseRefresher.listLeaseKeysForWorker(workerIdentifier);
+ MetricsUtil.addLatency(
+ metricsScope, "ListLeaseKeysForWorker", listLeaseKeysForWorkerStartTime, MetricsLevel.DETAILED);
+
+ final List newLeaseKeys = leaseKeys.stream()
+ .filter(leaseKey -> !currentHeldLeaseKeys.contains(leaseKey))
+ .collect(Collectors.toList());
+
+ final long fetchNewLeasesStartTime = System.currentTimeMillis();
+ final List> completableFutures = newLeaseKeys.stream()
+ .map(leaseKey ->
+ CompletableFuture.supplyAsync(() -> fetchLease(leaseKey, metricsScope), executorService))
+ .collect(Collectors.toList());
+
+ final List newLeases = completableFutures.stream()
+ .map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ log.info(
+ "New leases assigned to worker : {}, count : {}, leases : {}",
+ workerIdentifier,
+ newLeases.size(),
+ newLeases.stream().map(Lease::leaseKey).collect(Collectors.toList()));
+
+ MetricsUtil.addLatency(metricsScope, "FetchNewLeases", fetchNewLeasesStartTime, MetricsLevel.DETAILED);
+
+ success = true;
+ MetricsUtil.addCount(metricsScope, "NewLeasesDiscovered", newLeases.size(), MetricsLevel.DETAILED);
+ return newLeases;
+ } finally {
+ MetricsUtil.addWorkerIdentifier(metricsScope, workerIdentifier);
+ MetricsUtil.addSuccessAndLatency(metricsScope, success, startTime, MetricsLevel.SUMMARY);
+ MetricsUtil.endScope(metricsScope);
+ }
+ }
+
+ private Lease fetchLease(final String leaseKey, final MetricsScope metricsScope) {
+ try {
+ final Lease lease = leaseRefresher.getLease(leaseKey);
+ if (isNull(lease)) {
+ return null;
+ }
+ // GSI is eventually consistent thus, validate that the fetched lease is indeed assigned to this
+ // worker, if not just pass in this run.
+ if (!lease.leaseOwner().equals(workerIdentifier)) {
+ MetricsUtil.addCount(metricsScope, "OwnerMismatch", 1, MetricsLevel.DETAILED);
+ return null;
+ }
+ // if checkpointOwner is not null, it means that the lease is still pending shutdown for the last owner.
+ // Don't add the lease to the in-memory map yet.
+ if (lease.checkpointOwner() != null) {
+ return null;
+ }
+ // when a new lease is discovered, set the lastCounterIncrementNanos to current time as the time
+ // when it has become visible, on next renewer interval this will be updated by LeaseRenewer to
+ // correct time.
+ lease.lastCounterIncrementNanos(System.nanoTime());
+ return lease;
+ } catch (final Exception e) {
+ // if getLease on some lease key fail, continue and fetch other leases, the one failed will
+ // be fetched in the next iteration or will be reassigned if stayed idle for long.
+ MetricsUtil.addCount(metricsScope, "GetLease:Error", 1, MetricsLevel.SUMMARY);
+ log.error("GetLease failed for leaseKey : {}", leaseKey, e);
+ return null;
+ }
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index e5435bfc..1d6923d9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -17,18 +17,21 @@ package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
-import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
@@ -42,12 +45,15 @@ import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
+import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
*
*/
+@Slf4j
@Data
@KinesisClientInternalApi
public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@@ -73,6 +79,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@NonNull
private final LeaseSerializer leaseSerializer;
+ private final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig;
+
@NonNull
private StreamConfig streamConfig;
@@ -103,434 +111,11 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final Collection tags;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
+ private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
/**
* Constructor.
- *
- * NOTE: This constructor is deprecated and will be removed in a future release.
- *
- * @param kinesisClient
- * @param streamName
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param initialPositionInStream
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final InitialPositionInStreamExtended initialPositionInStream,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus) {
- this(
- kinesisClient,
- streamName,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- initialPositionInStream,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY);
- }
-
- /**
- * Constructor.
- *
- *
- * NOTE: This constructor is deprecated and will be removed in a future release.
- *
- *
- * @param kinesisClient
- * @param streamName
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param initialPositionInStream
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final InitialPositionInStreamExtended initialPositionInStream,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity) {
- this(
- kinesisClient,
- streamName,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- initialPositionInStream,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- new HierarchicalShardSyncer(),
- TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK,
- LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
- }
-
- /**
- * Constructor.
- *
- * @param kinesisClient
- * @param streamName
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param initialPositionInStream
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param hierarchicalShardSyncer
- * @param tableCreatorCallback
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final InitialPositionInStreamExtended initialPositionInStream,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer hierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback) {
- this(
- kinesisClient,
- streamName,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- initialPositionInStream,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- hierarchicalShardSyncer,
- tableCreatorCallback,
- LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
- }
-
- /**
- * Constructor.
- *
- * @param kinesisClient
- * @param streamName
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param initialPositionInStream
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param hierarchicalShardSyncer
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final InitialPositionInStreamExtended initialPositionInStream,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer hierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout) {
- this(
- kinesisClient,
- streamName,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- initialPositionInStream,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- hierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- BillingMode.PAY_PER_REQUEST);
- }
-
- /**
- * Constructor.
- *
- * @param kinesisClient
- * @param streamName
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param initialPositionInStream
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param hierarchicalShardSyncer
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final InitialPositionInStreamExtended initialPositionInStream,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer hierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- BillingMode billingMode) {
-
- this(
- kinesisClient,
- new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream),
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- hierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- new DynamoDBLeaseSerializer());
- }
-
- /**
- * Constructor.
+ * @deprecated this is used by the deprecated method in LeaseManagementConfig to construct the LeaseManagement factory
*
* @param kinesisClient
* @param streamName
@@ -592,291 +177,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
BillingMode billingMode,
Collection tags) {
- this(
- kinesisClient,
- new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream),
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- hierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- new DynamoDBLeaseSerializer());
- }
-
- /**
- * Constructor.
- *
- * @param kinesisClient
- * @param streamConfig
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param deprecatedHierarchicalShardSyncer
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- */
- @Deprecated
- private DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final StreamConfig streamConfig,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- BillingMode billingMode,
- LeaseSerializer leaseSerializer) {
- this(
- kinesisClient,
- streamConfig,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
- DefaultSdkAutoConstructList.getInstance(),
- leaseSerializer);
- }
-
- /**
- * Constructor.
- *
- * @param kinesisClient
- * @param streamConfig
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param deprecatedHierarchicalShardSyncer
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- * @param leaseTableDeletionProtectionEnabled
- * @param tags
- */
- @Deprecated
- private DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final StreamConfig streamConfig,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- BillingMode billingMode,
- final boolean leaseTableDeletionProtectionEnabled,
- Collection tags,
- LeaseSerializer leaseSerializer) {
- this(
- kinesisClient,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- failoverTimeMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- leaseTableDeletionProtectionEnabled,
- tags,
- leaseSerializer,
- null,
- false,
- LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
- this.streamConfig = streamConfig;
- }
-
- /**
- * Constructor.
- * @param kinesisClient
- * @param dynamoDBClient
- * @param tableName
- * @param workerIdentifier
- * @param executorService
- * @param failoverTimeMillis
- * @param epsilonMillis
- * @param maxLeasesForWorker
- * @param maxLeasesToStealAtOneTime
- * @param maxLeaseRenewalThreads
- * @param cleanupLeasesUponShardCompletion
- * @param ignoreUnexpectedChildShards
- * @param shardSyncIntervalMillis
- * @param consistentReads
- * @param listShardsBackoffTimeMillis
- * @param maxListShardsRetryAttempts
- * @param maxCacheMissesBeforeReload
- * @param listShardsCacheAllowedAgeInSeconds
- * @param cacheMissWarningModulus
- * @param initialLeaseTableReadCapacity
- * @param initialLeaseTableWriteCapacity
- * @param deprecatedHierarchicalShardSyncer
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- * @param leaseTableDeletionProtectionEnabled
- * @param leaseSerializer
- * @param customShardDetectorProvider
- * @param isMultiStreamMode
- * @param leaseCleanupConfig
- */
- @Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final long failoverTimeMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- BillingMode billingMode,
- final boolean leaseTableDeletionProtectionEnabled,
- Collection tags,
- LeaseSerializer leaseSerializer,
- Function customShardDetectorProvider,
- boolean isMultiStreamMode,
- LeaseCleanupConfig leaseCleanupConfig) {
this(
kinesisClient,
dynamoDBClient,
@@ -900,16 +200,21 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
cacheMissWarningModulus,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer,
+ hierarchicalShardSyncer,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
- leaseTableDeletionProtectionEnabled,
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED,
+ LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
tags,
- leaseSerializer,
- customShardDetectorProvider,
- isMultiStreamMode,
- leaseCleanupConfig);
+ new DynamoDBLeaseSerializer(),
+ null,
+ false,
+ LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG,
+ new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
+ LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build());
+ this.streamConfig =
+ new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream);
}
/**
@@ -947,75 +252,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseCleanupConfig
*/
@Deprecated
- public DynamoDBLeaseManagementFactory(
- final KinesisAsyncClient kinesisClient,
- final DynamoDbAsyncClient dynamoDBClient,
- final String tableName,
- final String workerIdentifier,
- final ExecutorService executorService,
- final long failoverTimeMillis,
- final boolean enablePriorityLeaseAssignment,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewalThreads,
- final boolean cleanupLeasesUponShardCompletion,
- final boolean ignoreUnexpectedChildShards,
- final long shardSyncIntervalMillis,
- final boolean consistentReads,
- final long listShardsBackoffTimeMillis,
- final int maxListShardsRetryAttempts,
- final int maxCacheMissesBeforeReload,
- final long listShardsCacheAllowedAgeInSeconds,
- final int cacheMissWarningModulus,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer,
- final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- BillingMode billingMode,
- final boolean leaseTableDeletionProtectionEnabled,
- Collection tags,
- LeaseSerializer leaseSerializer,
- Function customShardDetectorProvider,
- boolean isMultiStreamMode,
- LeaseCleanupConfig leaseCleanupConfig) {
- this(
- kinesisClient,
- dynamoDBClient,
- tableName,
- workerIdentifier,
- executorService,
- failoverTimeMillis,
- enablePriorityLeaseAssignment,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewalThreads,
- cleanupLeasesUponShardCompletion,
- ignoreUnexpectedChildShards,
- shardSyncIntervalMillis,
- consistentReads,
- listShardsBackoffTimeMillis,
- maxListShardsRetryAttempts,
- maxCacheMissesBeforeReload,
- listShardsCacheAllowedAgeInSeconds,
- cacheMissWarningModulus,
- initialLeaseTableReadCapacity,
- initialLeaseTableWriteCapacity,
- deprecatedHierarchicalShardSyncer,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- leaseTableDeletionProtectionEnabled,
- LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
- tags,
- leaseSerializer,
- customShardDetectorProvider,
- isMultiStreamMode,
- leaseCleanupConfig);
- }
-
public DynamoDBLeaseManagementFactory(
final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient,
@@ -1049,7 +285,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
LeaseSerializer leaseSerializer,
Function customShardDetectorProvider,
boolean isMultiStreamMode,
- LeaseCleanupConfig leaseCleanupConfig) {
+ LeaseCleanupConfig leaseCleanupConfig,
+ final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@@ -1083,10 +321,19 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
+ this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
+ this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
}
@Override
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
+ return createLeaseCoordinator(metricsFactory, new ConcurrentHashMap<>());
+ }
+
+ @Override
+ public LeaseCoordinator createLeaseCoordinator(
+ @NonNull final MetricsFactory metricsFactory,
+ @NonNull final ConcurrentMap shardInfoShardConsumerMap) {
return new DynamoDBLeaseCoordinator(
this.createLeaseRefresher(),
workerIdentifier,
@@ -1098,9 +345,15 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxLeaseRenewalThreads,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
- metricsFactory);
+ metricsFactory,
+ workerUtilizationAwareAssignmentConfig,
+ gracefulLeaseHandoffConfig,
+ shardInfoShardConsumerMap);
}
+ /**
+ * Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory
+ */
@Override
@Deprecated
public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFactory metricsFactory) {
@@ -1155,6 +408,10 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
+ final DdbTableConfig ddbTableConfig = new DdbTableConfig();
+ ddbTableConfig.billingMode(billingMode);
+ ddbTableConfig.readCapacity(initialLeaseTableReadCapacity);
+ ddbTableConfig.writeCapacity(initialLeaseTableWriteCapacity);
return new DynamoDBLeaseRefresher(
tableName,
dynamoDBClient,
@@ -1162,12 +419,15 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
consistentReads,
tableCreatorCallback,
dynamoDbRequestTimeout,
- billingMode,
+ ddbTableConfig,
leaseTableDeletionProtectionEnabled,
leaseTablePitrEnabled,
tags);
}
+ /**
+ * Even though this is deprecated, this is a method part of the public interface in LeaseManagementFactory
+ */
@Override
@Deprecated
public ShardDetector createShardDetector() {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
index 123f4068..593e40da 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java
@@ -15,48 +15,68 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableMap;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.CreateGlobalSecondaryIndexAction;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescription;
+import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexUpdate;
+import software.amazon.awssdk.services.dynamodb.model.IndexStatus;
import software.amazon.awssdk.services.dynamodb.model.LimitExceededException;
+import software.amazon.awssdk.services.dynamodb.model.Projection;
+import software.amazon.awssdk.services.dynamodb.model.ProjectionType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableResponse;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.leases.DynamoUtils;
import software.amazon.kinesis.leases.Lease;
-import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.UpdateField;
@@ -66,12 +86,19 @@ import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
+import static software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer.CHECKPOINT_OWNER;
+import static software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer.LEASE_KEY_KEY;
+import static software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer.LEASE_OWNER_KEY;
+
/**
* An implementation of {@link LeaseRefresher} that uses DynamoDB.
*/
@Slf4j
@KinesisClientInternalApi
public class DynamoDBLeaseRefresher implements LeaseRefresher {
+ static final String LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME = "LeaseOwnerToLeaseKeyIndex";
protected final String table;
protected final DynamoDbAsyncClient dynamoDBClient;
@@ -80,7 +107,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private final TableCreatorCallback tableCreatorCallback;
private final Duration dynamoDbRequestTimeout;
- private final BillingMode billingMode;
+ private final DdbTableConfig ddbTableConfig;
private final boolean leaseTableDeletionProtectionEnabled;
private final boolean leaseTablePitrEnabled;
private final Collection tags;
@@ -90,50 +117,15 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final String STREAM_NAME = "streamName";
private static final String DDB_STREAM_NAME = ":streamName";
- /**
- * Constructor.
- *
- *
- * NOTE: This constructor is deprecated and will be removed in a future release.
- *
- *
- * @param table
- * @param dynamoDBClient
- * @param serializer
- * @param consistentReads
- */
- @Deprecated
- public DynamoDBLeaseRefresher(
- final String table,
- final DynamoDbAsyncClient dynamoDBClient,
- final LeaseSerializer serializer,
- final boolean consistentReads) {
- this(table, dynamoDBClient, serializer, consistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
- }
+ private static final String DDB_LEASE_OWNER = ":" + LEASE_OWNER_KEY;
- /**
- * Constructor.
- *
- * @param table
- * @param dynamoDBClient
- * @param serializer
- * @param consistentReads
- * @param tableCreatorCallback
- */
- @Deprecated
- public DynamoDBLeaseRefresher(
- final String table,
- final DynamoDbAsyncClient dynamoDBClient,
- final LeaseSerializer serializer,
- final boolean consistentReads,
- @NonNull final TableCreatorCallback tableCreatorCallback) {
- this(
- table,
- dynamoDBClient,
- serializer,
- consistentReads,
- tableCreatorCallback,
- LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
+ private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
+ String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
+
+ private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
+ final DdbTableConfig tableConfig = new DdbTableConfig();
+ tableConfig.billingMode(billingMode);
+ return tableConfig;
}
/**
@@ -144,72 +136,11 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
* @param consistentReads
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
- */
- @Deprecated
- public DynamoDBLeaseRefresher(
- final String table,
- final DynamoDbAsyncClient dynamoDBClient,
- final LeaseSerializer serializer,
- final boolean consistentReads,
- @NonNull final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout) {
- this(
- table,
- dynamoDBClient,
- serializer,
- consistentReads,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- BillingMode.PAY_PER_REQUEST,
- LeaseManagementConfig.DEFAULT_LEASE_TABLE_DELETION_PROTECTION_ENABLED);
- }
-
- /**
- * Constructor.
- * @param table
- * @param dynamoDBClient
- * @param serializer
- * @param consistentReads
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- * @param leaseTableDeletionProtectionEnabled
- */
- @Deprecated
- public DynamoDBLeaseRefresher(
- final String table,
- final DynamoDbAsyncClient dynamoDBClient,
- final LeaseSerializer serializer,
- final boolean consistentReads,
- @NonNull final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- final BillingMode billingMode,
- final boolean leaseTableDeletionProtectionEnabled) {
- this(
- table,
- dynamoDBClient,
- serializer,
- consistentReads,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- leaseTableDeletionProtectionEnabled,
- DefaultSdkAutoConstructList.getInstance());
- }
-
- /**
- * Constructor.
- * @param table
- * @param dynamoDBClient
- * @param serializer
- * @param consistentReads
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
+ * @param ddbTableConfig
* @param leaseTableDeletionProtectionEnabled
+ * @param leaseTablePitrEnabled
* @param tags
*/
- @Deprecated
public DynamoDBLeaseRefresher(
final String table,
final DynamoDbAsyncClient dynamoDBClient,
@@ -217,41 +148,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final boolean consistentReads,
@NonNull final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout,
- final BillingMode billingMode,
- final boolean leaseTableDeletionProtectionEnabled,
- final Collection tags) {
- this(
- table,
- dynamoDBClient,
- serializer,
- consistentReads,
- tableCreatorCallback,
- dynamoDbRequestTimeout,
- billingMode,
- leaseTableDeletionProtectionEnabled,
- LeaseManagementConfig.DEFAULT_LEASE_TABLE_PITR_ENABLED,
- tags);
- }
-
- /**
- * Constructor.
- * @param table
- * @param dynamoDBClient
- * @param serializer
- * @param consistentReads
- * @param tableCreatorCallback
- * @param dynamoDbRequestTimeout
- * @param billingMode
- * @param leaseTableDeletionProtectionEnabled
- */
- public DynamoDBLeaseRefresher(
- final String table,
- final DynamoDbAsyncClient dynamoDBClient,
- final LeaseSerializer serializer,
- final boolean consistentReads,
- @NonNull final TableCreatorCallback tableCreatorCallback,
- Duration dynamoDbRequestTimeout,
- final BillingMode billingMode,
+ final DdbTableConfig ddbTableConfig,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection tags) {
@@ -261,7 +158,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
this.consistentReads = consistentReads;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
- this.billingMode = billingMode;
+ this.ddbTableConfig = ddbTableConfig;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.tags = tags;
@@ -269,18 +166,16 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
/**
* {@inheritDoc}
+ * This method always creates table in PROVISIONED mode and with RCU and WCU provided as method args
*/
@Override
public boolean createLeaseTableIfNotExists(@NonNull final Long readCapacity, @NonNull final Long writeCapacity)
throws ProvisionedThroughputException, DependencyException {
- final CreateTableRequest.Builder builder = createTableRequestBuilder();
- if (BillingMode.PROVISIONED.equals(billingMode)) {
- ProvisionedThroughput throughput = ProvisionedThroughput.builder()
- .readCapacityUnits(readCapacity)
- .writeCapacityUnits(writeCapacity)
- .build();
- builder.provisionedThroughput(throughput);
- }
+
+ final DdbTableConfig overriddenTableConfig = createDdbTableConfigFromBillingMode(BillingMode.PROVISIONED);
+ overriddenTableConfig.readCapacity(readCapacity);
+ overriddenTableConfig.writeCapacity(writeCapacity);
+ final CreateTableRequest.Builder builder = createTableRequestBuilder(overriddenTableConfig);
return createTableIfNotExists(builder.build());
}
@@ -289,15 +184,14 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
*/
@Override
public boolean createLeaseTableIfNotExists() throws ProvisionedThroughputException, DependencyException {
- final CreateTableRequest request = createTableRequestBuilder().build();
+ final CreateTableRequest request =
+ createTableRequestBuilder(ddbTableConfig).build();
boolean tableExists = createTableIfNotExists(request);
-
if (leaseTablePitrEnabled) {
enablePitr();
log.info("Enabled PITR on table {}", table);
}
-
return tableExists;
}
@@ -323,7 +217,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private boolean createTableIfNotExists(CreateTableRequest request)
throws ProvisionedThroughputException, DependencyException {
try {
- if (tableStatus() != null) {
+ if (describeLeaseTable() != null) {
return newTableCreated;
}
} catch (DependencyException de) {
@@ -367,7 +261,12 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
}
private TableStatus tableStatus() throws DependencyException {
- DescribeTableRequest request =
+ final DescribeTableResponse response = describeLeaseTable();
+ return nonNull(response) ? response.table().tableStatus() : null;
+ }
+
+ private DescribeTableResponse describeLeaseTable() throws DependencyException {
+ final DescribeTableRequest request =
DescribeTableRequest.builder().tableName(table).build();
final AWSExceptionManager exceptionManager = createExceptionManager();
@@ -394,7 +293,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
TableStatus tableStatus = result.table().tableStatus();
log.debug("Lease table exists and is in status {}", tableStatus);
- return tableStatus;
+ return result;
}
@Override
@@ -405,6 +304,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
if (sleepTimeRemaining <= 0) {
return false;
}
+ log.info("Waiting for Lease table creation...");
long timeToSleepMillis = Math.min(TimeUnit.SECONDS.toMillis(secondsBetweenPolls), sleepTimeRemaining);
@@ -419,6 +319,139 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return true;
}
+ private static boolean isTableInPayPerRequestMode(final DescribeTableResponse describeTableResponse) {
+ if (nonNull(describeTableResponse)
+ && nonNull(describeTableResponse.table().billingModeSummary())
+ && describeTableResponse
+ .table()
+ .billingModeSummary()
+ .billingMode()
+ .equals(BillingMode.PAY_PER_REQUEST)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String createLeaseOwnerToLeaseKeyIndexIfNotExists() throws DependencyException {
+ final DescribeTableResponse describeTableResponse = describeLeaseTable();
+ ProvisionedThroughput provisionedThroughput = null;
+ if (nonNull(describeTableResponse)) {
+ // If table already on PAY_PER_REQUEST then setting null provisionedThroughput creates the GSI in
+ // PAY_PER_REQUEST mode
+ if (!isTableInPayPerRequestMode(describeTableResponse)) {
+ /*
+ * Whatever is configured at the base table use that as WCU and RCU for the GSI. If this is new
+ * application created with provision mode, the set WCU and RCU will be same as that of what application
+ * provided, if this is old application where application provided WCU and RCU is no longer what is set
+ * on base table then we honor the capacity of base table. This is to avoid setting WCU and RCU very
+ * less on GSI and cause issues with base table. Customers are expected to tune in GSI WCU and RCU
+ * themselves after creation as they deem fit.
+ */
+ provisionedThroughput = ProvisionedThroughput.builder()
+ .readCapacityUnits(describeTableResponse
+ .table()
+ .provisionedThroughput()
+ .readCapacityUnits())
+ .writeCapacityUnits(describeTableResponse
+ .table()
+ .provisionedThroughput()
+ .writeCapacityUnits())
+ .build();
+ }
+
+ final IndexStatus indexStatus = getIndexStatusFromDescribeTableResponse(
+ describeTableResponse.table(), LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME);
+ if (nonNull(indexStatus)) {
+ log.info(
+ "Lease table GSI {} already exists with status {}",
+ LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME,
+ indexStatus);
+
+ // indexStatus is nonNull that means index already exists, return the status of index.
+ return indexStatus.toString();
+ }
+ }
+ final UpdateTableRequest updateTableRequest = UpdateTableRequest.builder()
+ .tableName(table)
+ .attributeDefinitions(serializer.getWorkerIdToLeaseKeyIndexAttributeDefinitions())
+ .globalSecondaryIndexUpdates(GlobalSecondaryIndexUpdate.builder()
+ .create(CreateGlobalSecondaryIndexAction.builder()
+ .indexName(LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME)
+ .keySchema(serializer.getWorkerIdToLeaseKeyIndexKeySchema())
+ .projection(Projection.builder()
+ .projectionType(ProjectionType.KEYS_ONLY)
+ .build())
+ .provisionedThroughput(provisionedThroughput)
+ .build())
+ .build())
+ .build();
+
+ try {
+ log.info("Creating Lease table GSI {}", LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME);
+ final UpdateTableResponse response = FutureUtils.resolveOrCancelFuture(
+ dynamoDBClient.updateTable(updateTableRequest), dynamoDbRequestTimeout);
+ return getIndexStatusFromDescribeTableResponse(
+ response.tableDescription(), LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME)
+ .toString();
+ } catch (ExecutionException e) {
+ throw new DependencyException(nonNull(e.getCause()) ? e.getCause() : e);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new DependencyException(e);
+ }
+ }
+
+ private IndexStatus getIndexStatusFromDescribeTableResponse(
+ final TableDescription tableDescription, final String indexName) {
+ if (isNull(tableDescription)) {
+ return null;
+ }
+ return tableDescription.globalSecondaryIndexes().stream()
+ .filter(index -> index.indexName().equals(indexName))
+ .findFirst()
+ .map(GlobalSecondaryIndexDescription::indexStatus)
+ .orElse(null);
+ }
+
+ @Override
+ public boolean waitUntilLeaseOwnerToLeaseKeyIndexExists(final long secondsBetweenPolls, final long timeoutSeconds) {
+ final long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime
+ < Duration.ofSeconds(timeoutSeconds).toMillis()) {
+ try {
+ if (isLeaseOwnerToLeaseKeyIndexActive()) {
+ return true;
+ }
+ } catch (final Exception e) {
+ log.warn("Failed to fetch {} status", LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME, e);
+ }
+ try {
+ log.info("GSI status is not active, trying again in {}s", secondsBetweenPolls);
+ Thread.sleep(Duration.ofSeconds(secondsBetweenPolls).toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ log.info("GSI status was not active, after {}s", timeoutSeconds);
+ return false;
+ }
+
+ @Override
+ public boolean isLeaseOwnerToLeaseKeyIndexActive() throws DependencyException {
+ final DescribeTableResponse describeTableResponse = describeLeaseTable();
+ if (isNull(describeTableResponse)) {
+ return false;
+ }
+ final IndexStatus indexStatus = getIndexStatusFromDescribeTableResponse(
+ describeTableResponse.table(), LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME);
+ log.debug(
+ "Lease table GSI {} status {}",
+ LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME,
+ indexStatus == null ? "does not exist" : indexStatus);
+
+ return indexStatus == IndexStatus.ACTIVE;
+ }
+
/**
* Exposed for testing purposes.
*
@@ -447,6 +480,64 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return list(null, streamIdentifier);
}
+ /**
+ * {@inheritDoc}
+ *
+ * This method throws InvalidStateException in case of
+ * {@link DynamoDBLeaseRefresher#LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME} does not exists.
+ * If index creation is not done and want to listLeases for a worker,
+ * use {@link DynamoDBLeaseRefresher#listLeases()} and filter on that to list leases.
+ */
+ @Override
+ public List listLeaseKeysForWorker(final String workerIdentifier)
+ throws DependencyException, InvalidStateException {
+ QueryRequest queryRequest = QueryRequest.builder()
+ .indexName(LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME)
+ .keyConditionExpression(LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION)
+ .expressionAttributeValues(ImmutableMap.of(
+ DDB_LEASE_OWNER,
+ AttributeValue.builder().s(workerIdentifier).build()))
+ .tableName(table)
+ .build();
+
+ final AWSExceptionManager exceptionManager = createExceptionManager();
+ exceptionManager.add(ResourceNotFoundException.class, t -> t);
+
+ try {
+ try {
+ final List result = new ArrayList<>();
+
+ QueryResponse queryResponse =
+ FutureUtils.resolveOrCancelFuture(dynamoDBClient.query(queryRequest), dynamoDbRequestTimeout);
+
+ while (queryResponse != null) {
+ for (Map item : queryResponse.items()) {
+ result.add(item.get(LEASE_KEY_KEY).s());
+ }
+ final Map lastEvaluatedKey = queryResponse.lastEvaluatedKey();
+ if (CollectionUtils.isNullOrEmpty(lastEvaluatedKey)) {
+ // Signify that we're done.
+ queryResponse = null;
+ } else {
+ // Make another request, picking up where we left off.
+ queryRequest = queryRequest.toBuilder()
+ .exclusiveStartKey(lastEvaluatedKey)
+ .build();
+ queryResponse = FutureUtils.resolveOrCancelFuture(
+ dynamoDBClient.query(queryRequest), dynamoDbRequestTimeout);
+ }
+ }
+ return result;
+ } catch (final ExecutionException e) {
+ throw exceptionManager.apply(e.getCause());
+ }
+ } catch (final ResourceNotFoundException e) {
+ throw new InvalidStateException(LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME + " does not exists.", e);
+ } catch (final Exception e) {
+ throw new DependencyException(e);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -455,8 +546,87 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return list(null, null);
}
+ @Override
+ public Map.Entry, List> listLeasesParallely(
+ final ExecutorService parallelScanExecutorService, final int parallelScanTotalSegment)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ final List leaseItemFailedDeserialize = new ArrayList<>();
+ final List response = new ArrayList<>();
+ final List>>> futures = new ArrayList<>();
+ for (int i = 0; i < parallelScanTotalSegment; ++i) {
+ final int segmentNumber = i;
+ futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment)));
+ }
+ try {
+ for (final Future>> future : futures) {
+ for (final Map item : future.get()) {
+ try {
+ response.add(serializer.fromDynamoRecord(item));
+ } catch (final Exception e) {
+ // If one or more leases failed to deserialize for some reason (e.g. corrupted lease etc
+ // do not fail all list call. Capture failed deserialize item and return to caller.
+ log.error("Failed to deserialize lease", e);
+ // If a item exists in DDB then "leaseKey" should be always present as its primaryKey
+ leaseItemFailedDeserialize.add(item.get(LEASE_KEY_KEY).s());
+ }
+ }
+ }
+ } catch (final ExecutionException e) {
+ final Throwable throwable = e.getCause() != null ? e.getCause() : e;
+ if (throwable instanceof ResourceNotFoundException) {
+ throw new InvalidStateException("Cannot scan lease table " + table + " because it does not exist.", e);
+ } else if (throwable instanceof ProvisionedThroughputException) {
+ throw new ProvisionedThroughputException(e);
+ } else {
+ throw new DependencyException(e);
+ }
+ } catch (final InterruptedException e) {
+ throw new DependencyException(e);
+ }
+ return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
+ }
+
+ private List