diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 70f417e6..be484a73 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -59,6 +59,8 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; + private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -70,8 +72,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final long takerIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final LeaseRefresher leaseRefresher; - private final long initialLeaseTableReadCapacity; - private final long initialLeaseTableWriteCapacity; + private long initialLeaseTableReadCapacity; + private long initialLeaseTableWriteCapacity; protected final MetricsFactory metricsFactory; private final Object shutdownLock = new Object(); @@ -81,6 +83,40 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private volatile boolean running = false; + /** + * Constructor. + * + *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * + * @param leaseRefresher + * LeaseRefresher instance to use + * @param workerIdentifier + * Identifies the worker (e.g. useful to track lease ownership) + * @param leaseDurationMillis + * Duration of a lease + * @param epsilonMillis + * Allow for some variance when calculating lease expirations + * @param maxLeasesForWorker + * Max leases this Worker can handle at a time + * @param maxLeasesToStealAtOneTime + * Steal up to these many leases at a time (for load balancing) + * @param metricsFactory + * Used to publish metrics about lease operations + */ + @Deprecated + public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, + final String workerIdentifier, + final long leaseDurationMillis, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewerThreadCount, + final MetricsFactory metricsFactory) { + this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + } + /** * Constructor. * @@ -342,13 +378,33 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { lease.checkpoint()); } + /** + * {@inheritDoc} + * + *

NOTE: This method is deprecated. Please set the initial capacity through the constructor.

+ */ @Override + @Deprecated public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) { - throw new UnsupportedOperationException("Please set read capacity using the constructor"); + if (readCapacity <= 0) { + throw new IllegalArgumentException("readCapacity should be >= 1"); + } + initialLeaseTableReadCapacity = readCapacity; + return this; } + /** + * {@inheritDoc} + * + *

NOTE: This method is deprecated. Please set the initial capacity through the constructor.

+ */ @Override + @Deprecated public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) { - throw new UnsupportedOperationException("Please set write capacity using the constructor"); + if (writeCapacity <= 0) { + throw new IllegalArgumentException("writeCapacity should be >= 1"); + } + initialLeaseTableWriteCapacity = writeCapacity; + return this; } }