From 51ec96bf9a7d2b2c0a6542b1bb302dcce56bfd5a Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 9 Aug 2018 15:14:40 -0700 Subject: [PATCH 1/3] Fixing IOPs issue with lease table * Making DynamoDBLeaseCoordinator take IOPs configuration in the constructor * InitialLeaseTableReadCapacity and InitialLeaseTableWriteCapacity for the DynamoDBLeaseCoordinator class throws UnsupportedException --- .../kinesis/leases/LeaseManagementConfig.java | 10 ++-- .../dynamodb/DynamoDBLeaseCoordinator.java | 58 +++++++++++-------- .../DynamoDBLeaseManagementFactory.java | 4 ++ .../leases/LeaseCoordinatorExerciser.java | 8 ++- ...namoDBLeaseCoordinatorIntegrationTest.java | 5 +- 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 551872a7..ccaa2ec9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -21,15 +21,15 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -216,7 +216,9 @@ public class LeaseManagementConfig { maxListShardsRetryAttempts(), maxCacheMissesBeforeReload(), listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus()); + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 2ae70669..70f417e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -44,8 +44,8 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; /** @@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -72,11 +70,12 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final long takerIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final LeaseRefresher leaseRefresher; - private final Object shutdownLock = new Object(); + private final long initialLeaseTableReadCapacity; + private final long initialLeaseTableWriteCapacity; protected final MetricsFactory metricsFactory; - private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; - private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + private final Object shutdownLock = new Object(); + private ScheduledExecutorService leaseCoordinatorThreadPool; private ScheduledFuture takerFuture; @@ -85,13 +84,24 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { /** * Constructor. * - * @param leaseRefresher LeaseRefresher instance to use - * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) - * @param leaseDurationMillis Duration of a lease - * @param epsilonMillis Allow for some variance when calculating lease expirations - * @param maxLeasesForWorker Max leases this Worker can handle at a time - * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) - * @param metricsFactory Used to publish metrics about lease operations + * @param leaseRefresher + * LeaseRefresher instance to use + * @param workerIdentifier + * Identifies the worker (e.g. useful to track lease ownership) + * @param leaseDurationMillis + * Duration of a lease + * @param epsilonMillis + * Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker + * Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime + * Steal up to these many leases at a time (for load balancing) + * @param initialLeaseTableReadCapacity + * Initial dynamodb lease table read iops if creating the lease table + * @param initialLeaseTableWriteCapacity + * Initial dynamodb lease table write iops if creating the lease table + * @param metricsFactory + * Used to publish metrics about lease operations */ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, @@ -100,6 +110,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewerThreadCount, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); @@ -110,6 +122,14 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; + if (initialLeaseTableReadCapacity <= 0) { + throw new IllegalArgumentException("readCapacity should be >= 1"); + } + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + if (initialLeaseTableWriteCapacity <= 0) { + throw new IllegalArgumentException("writeCapacity should be >= 1"); + } + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.metricsFactory = metricsFactory; log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take" @@ -324,19 +344,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { @Override public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) { - if (readCapacity <= 0) { - throw new IllegalArgumentException("readCapacity should be >= 1"); - } - this.initialLeaseTableReadCapacity = readCapacity; - return this; + throw new UnsupportedOperationException("Please set read capacity using the constructor"); } @Override public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) { - if (writeCapacity <= 0) { - throw new IllegalArgumentException("writeCapacity should be >= 1"); - } - this.initialLeaseTableWriteCapacity = writeCapacity; - return this; + throw new UnsupportedOperationException("Please set write capacity using the constructor"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index cce48b07..41bf2d27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -64,6 +64,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final int maxCacheMissesBeforeReload; private final long listShardsCacheAllowedAgeInSeconds; private final int cacheMissWarningModulus; + private final long initialLeaseTableReadCapacity; + private final long initialLeaseTableWriteCapacity; @Override public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { @@ -74,6 +76,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, + initialLeaseTableReadCapacity, + initialLeaseTableWriteCapacity, metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 77bb06dd..c3703473 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -50,6 +50,8 @@ public class LeaseCoordinatorExerciser { private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final MetricsLevel METRICS_LEVEL = MetricsLevel.DETAILED; private static final int FLUSH_SIZE = 200; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L; public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { @@ -65,7 +67,8 @@ public class LeaseCoordinatorExerciser { LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, new DynamoDBLeaseSerializer(), true); - if (leaseRefresher.createLeaseTableIfNotExists(10L, 50L)) { + if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY, + INITIAL_LEASE_TABLE_WRITE_CAPACITY)) { log.info("Waiting for newly created lease table"); if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) { log.error("Table was not created in time"); @@ -83,7 +86,8 @@ public class LeaseCoordinatorExerciser { LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, - MAX_LEASE_RENEWER_THREAD_COUNT, metricsFactory); + MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, + INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); coordinators.add(coord); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 7f005f0c..40543dce 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -56,6 +56,9 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; + private static DynamoDBLeaseRefresher leaseRefresher; private static DynamoDBCheckpointer dynamoDBCheckpointer; @@ -93,7 +96,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { leaseRefresher.deleteAll(); coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, - metricsFactory); + INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer.operation(OPERATION); From ca88ee9bc63b25830dbba9a46cecdcb30cf8e2c2 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 10:08:59 -0700 Subject: [PATCH 2/3] Reverting some changes: * Reverting the constructor, and adding chained constructor * Reverting support for initial iops methods * Adding deprecated tags and notes to javadoc --- .../dynamodb/DynamoDBLeaseCoordinator.java | 64 +++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 70f417e6..be484a73 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -59,6 +59,8 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; + private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -70,8 +72,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final long takerIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final LeaseRefresher leaseRefresher; - private final long initialLeaseTableReadCapacity; - private final long initialLeaseTableWriteCapacity; + private long initialLeaseTableReadCapacity; + private long initialLeaseTableWriteCapacity; protected final MetricsFactory metricsFactory; private final Object shutdownLock = new Object(); @@ -81,6 +83,40 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private volatile boolean running = false; + /** + * Constructor. + * + *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * + * @param leaseRefresher + * LeaseRefresher instance to use + * @param workerIdentifier + * Identifies the worker (e.g. useful to track lease ownership) + * @param leaseDurationMillis + * Duration of a lease + * @param epsilonMillis + * Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker + * Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime + * Steal up to these many leases at a time (for load balancing) + * @param metricsFactory + * Used to publish metrics about lease operations + */ + @Deprecated + public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, + final String workerIdentifier, + final long leaseDurationMillis, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewerThreadCount, + final MetricsFactory metricsFactory) { + this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + } + /** * Constructor. * @@ -342,13 +378,33 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { lease.checkpoint()); } + /** + * {@inheritDoc} + * + *

NOTE: This method is deprecated. Please set the initial capacity through the constructor.

+ */ @Override + @Deprecated public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) { - throw new UnsupportedOperationException("Please set read capacity using the constructor"); + if (readCapacity <= 0) { + throw new IllegalArgumentException("readCapacity should be >= 1"); + } + initialLeaseTableReadCapacity = readCapacity; + return this; } + /** + * {@inheritDoc} + * + *

NOTE: This method is deprecated. Please set the initial capacity through the constructor.

+ */ @Override + @Deprecated public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) { - throw new UnsupportedOperationException("Please set write capacity using the constructor"); + if (writeCapacity <= 0) { + throw new IllegalArgumentException("writeCapacity should be >= 1"); + } + initialLeaseTableWriteCapacity = writeCapacity; + return this; } } From f1cbf15075fd410dc7fc203af3a1cd6f2b562b4d Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 14:57:58 -0700 Subject: [PATCH 3/3] Introducing changes to avoid breaking changes * Introducing chained constructors in DynamoDBLeaseManagementFactory * Introducing TableConstants to maintain Default IOPS in one place --- .../dynamodb/DynamoDBLeaseCoordinator.java | 7 +- .../DynamoDBLeaseManagementFactory.java | 108 ++++++++++++++++++ .../leases/dynamodb/TableConstants.java | 29 +++++ 3 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index be484a73..12ca3a01 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -113,8 +111,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final int maxLeaseRenewerThreadCount, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, - maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, - DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 41bf2d27..3c20c591 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -67,6 +67,114 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableReadCapacity; private final long initialLeaseTableWriteCapacity; + /** + * Constructor. + * + *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + */ + @Deprecated + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.dynamoDBClient = dynamoDBClient; + this.tableName = tableName; + this.workerIdentifier = workerIdentifier; + this.executorService = executorService; + this.initialPositionInStream = initialPositionInStream; + this.failoverTimeMillis = failoverTimeMillis; + this.epsilonMillis = epsilonMillis; + this.maxLeasesForWorker = maxLeasesForWorker; + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.consistentReads = consistentReads; + this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + } + @Override public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java new file mode 100644 index 00000000..3848c2f0 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java @@ -0,0 +1,29 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.leases.dynamodb; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * This class is just a holder for initial lease table IOPs units. This class will be removed in a future release. + */ +@Deprecated +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class TableConstants { + public static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + public static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; +}