diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 551872a7..ccaa2ec9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -21,15 +21,15 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -216,7 +216,9 @@ public class LeaseManagementConfig { maxListShardsRetryAttempts(), maxCacheMissesBeforeReload(), listShardsCacheAllowedAgeInSeconds(), - cacheMissWarningModulus()); + cacheMissWarningModulus(), + initialLeaseTableReadCapacity(), + initialLeaseTableWriteCapacity()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 2ae70669..70f417e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -44,8 +44,8 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; /** @@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -72,11 +70,12 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final long takerIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final LeaseRefresher leaseRefresher; - private final Object shutdownLock = new Object(); + private final long initialLeaseTableReadCapacity; + private final long initialLeaseTableWriteCapacity; protected final MetricsFactory metricsFactory; - private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; - private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + private final Object shutdownLock = new Object(); + private ScheduledExecutorService leaseCoordinatorThreadPool; private ScheduledFuture takerFuture; @@ -85,13 +84,24 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { /** * Constructor. * - * @param leaseRefresher LeaseRefresher instance to use - * @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) - * @param leaseDurationMillis Duration of a lease - * @param epsilonMillis Allow for some variance when calculating lease expirations - * @param maxLeasesForWorker Max leases this Worker can handle at a time - * @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) - * @param metricsFactory Used to publish metrics about lease operations + * @param leaseRefresher + * LeaseRefresher instance to use + * @param workerIdentifier + * Identifies the worker (e.g. useful to track lease ownership) + * @param leaseDurationMillis + * Duration of a lease + * @param epsilonMillis + * Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker + * Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime + * Steal up to these many leases at a time (for load balancing) + * @param initialLeaseTableReadCapacity + * Initial dynamodb lease table read iops if creating the lease table + * @param initialLeaseTableWriteCapacity + * Initial dynamodb lease table write iops if creating the lease table + * @param metricsFactory + * Used to publish metrics about lease operations */ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, final String workerIdentifier, @@ -100,6 +110,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewerThreadCount, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, final MetricsFactory metricsFactory) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); @@ -110,6 +122,14 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; + if (initialLeaseTableReadCapacity <= 0) { + throw new IllegalArgumentException("readCapacity should be >= 1"); + } + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + if (initialLeaseTableWriteCapacity <= 0) { + throw new IllegalArgumentException("writeCapacity should be >= 1"); + } + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; this.metricsFactory = metricsFactory; log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take" @@ -324,19 +344,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { @Override public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) { - if (readCapacity <= 0) { - throw new IllegalArgumentException("readCapacity should be >= 1"); - } - this.initialLeaseTableReadCapacity = readCapacity; - return this; + throw new UnsupportedOperationException("Please set read capacity using the constructor"); } @Override public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) { - if (writeCapacity <= 0) { - throw new IllegalArgumentException("writeCapacity should be >= 1"); - } - this.initialLeaseTableWriteCapacity = writeCapacity; - return this; + throw new UnsupportedOperationException("Please set write capacity using the constructor"); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index cce48b07..41bf2d27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -64,6 +64,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final int maxCacheMissesBeforeReload; private final long listShardsCacheAllowedAgeInSeconds; private final int cacheMissWarningModulus; + private final long initialLeaseTableReadCapacity; + private final long initialLeaseTableWriteCapacity; @Override public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { @@ -74,6 +76,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, + initialLeaseTableReadCapacity, + initialLeaseTableWriteCapacity, metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 77bb06dd..c3703473 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -50,6 +50,8 @@ public class LeaseCoordinatorExerciser { private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final MetricsLevel METRICS_LEVEL = MetricsLevel.DETAILED; private static final int FLUSH_SIZE = 200; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L; public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException, ProvisionedThroughputException, IOException { @@ -65,7 +67,8 @@ public class LeaseCoordinatorExerciser { LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, new DynamoDBLeaseSerializer(), true); - if (leaseRefresher.createLeaseTableIfNotExists(10L, 50L)) { + if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY, + INITIAL_LEASE_TABLE_WRITE_CAPACITY)) { log.info("Waiting for newly created lease table"); if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) { log.error("Table was not created in time"); @@ -83,7 +86,8 @@ public class LeaseCoordinatorExerciser { LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, - MAX_LEASE_RENEWER_THREAD_COUNT, metricsFactory); + MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, + INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); coordinators.add(coord); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 7f005f0c..40543dce 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -56,6 +56,9 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; + private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; + private static DynamoDBLeaseRefresher leaseRefresher; private static DynamoDBCheckpointer dynamoDBCheckpointer; @@ -93,7 +96,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { leaseRefresher.deleteAll(); coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, - metricsFactory); + INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer.operation(OPERATION);