Merge pull request #360 from sahilpalvia/dynamodb-iops-fix

Ensure that lease tables are created with the specified IOPs values.
This commit is contained in:
Justin Pfifer 2018-08-13 07:50:21 -07:00 committed by GitHub
commit 6973152f60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 239 additions and 22 deletions

View file

@ -21,15 +21,15 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data; import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -216,7 +216,9 @@ public class LeaseManagementConfig {
maxListShardsRetryAttempts(), maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(), maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(), listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus()); cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity());
} }
return leaseManagementFactory; return leaseManagementFactory;
} }

View file

@ -44,8 +44,8 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.MetricsUtil;
/** /**
@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil;
public class DynamoDBLeaseCoordinator implements LeaseCoordinator { public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
// Time to wait for in-flight Runnables to finish when calling .stop(); // Time to wait for in-flight Runnables to finish when calling .stop();
private static final long STOP_WAIT_TIME_MILLIS = 2000L; private static final long STOP_WAIT_TIME_MILLIS = 2000L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
@ -72,11 +70,12 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
private final long takerIntervalMillis; private final long takerIntervalMillis;
private final ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseRenewalThreadpool;
private final LeaseRefresher leaseRefresher; private final LeaseRefresher leaseRefresher;
private final Object shutdownLock = new Object(); private long initialLeaseTableReadCapacity;
private long initialLeaseTableWriteCapacity;
protected final MetricsFactory metricsFactory; protected final MetricsFactory metricsFactory;
private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; private final Object shutdownLock = new Object();
private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
private ScheduledExecutorService leaseCoordinatorThreadPool; private ScheduledExecutorService leaseCoordinatorThreadPool;
private ScheduledFuture<?> takerFuture; private ScheduledFuture<?> takerFuture;
@ -85,13 +84,59 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
/** /**
* Constructor. * Constructor.
* *
* @param leaseRefresher LeaseRefresher instance to use * <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership) *
* @param leaseDurationMillis Duration of a lease * @param leaseRefresher
* @param epsilonMillis Allow for some variance when calculating lease expirations * LeaseRefresher instance to use
* @param maxLeasesForWorker Max leases this Worker can handle at a time * @param workerIdentifier
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing) * Identifies the worker (e.g. useful to track lease ownership)
* @param metricsFactory Used to publish metrics about lease operations * @param leaseDurationMillis
* Duration of a lease
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
* Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime
* Steal up to these many leases at a time (for load balancing)
* @param metricsFactory
* Used to publish metrics about lease operations
*/
@Deprecated
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
/**
* Constructor.
*
* @param leaseRefresher
* LeaseRefresher instance to use
* @param workerIdentifier
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
* Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime
* Steal up to these many leases at a time (for load balancing)
* @param initialLeaseTableReadCapacity
* Initial dynamodb lease table read iops if creating the lease table
* @param initialLeaseTableWriteCapacity
* Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory
* Used to publish metrics about lease operations
*/ */
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher, public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier, final String workerIdentifier,
@ -100,6 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final int maxLeasesForWorker, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount, final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) { final MetricsFactory metricsFactory) {
this.leaseRefresher = leaseRefresher; this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount); this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
@ -110,6 +157,14 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis; this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
if (initialLeaseTableWriteCapacity <= 0) {
throw new IllegalArgumentException("writeCapacity should be >= 1");
}
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take" log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take"
@ -322,21 +377,33 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
lease.checkpoint()); lease.checkpoint());
} }
/**
* {@inheritDoc}
*
* <p>NOTE: This method is deprecated. Please set the initial capacity through the constructor.</p>
*/
@Override @Override
@Deprecated
public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) { public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) {
if (readCapacity <= 0) { if (readCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1"); throw new IllegalArgumentException("readCapacity should be >= 1");
} }
this.initialLeaseTableReadCapacity = readCapacity; initialLeaseTableReadCapacity = readCapacity;
return this; return this;
} }
/**
* {@inheritDoc}
*
* <p>NOTE: This method is deprecated. Please set the initial capacity through the constructor.</p>
*/
@Override @Override
@Deprecated
public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) { public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) {
if (writeCapacity <= 0) { if (writeCapacity <= 0) {
throw new IllegalArgumentException("writeCapacity should be >= 1"); throw new IllegalArgumentException("writeCapacity should be >= 1");
} }
this.initialLeaseTableWriteCapacity = writeCapacity; initialLeaseTableWriteCapacity = writeCapacity;
return this; return this;
} }
} }

View file

@ -64,6 +64,116 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final int maxCacheMissesBeforeReload; private final int maxCacheMissesBeforeReload;
private final long listShardsCacheAllowedAgeInSeconds; private final long listShardsCacheAllowedAgeInSeconds;
private final int cacheMissWarningModulus; private final int cacheMissWarningModulus;
private final long initialLeaseTableReadCapacity;
private final long initialLeaseTableWriteCapacity;
/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY);
}
/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
this.failoverTimeMillis = failoverTimeMillis;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.consistentReads = consistentReads;
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
}
@Override @Override
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
@ -74,6 +184,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxLeasesForWorker, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads, maxLeaseRenewalThreads,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
metricsFactory); metricsFactory);
} }

View file

@ -0,0 +1,29 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.leases.dynamodb;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
* This class is just a holder for initial lease table IOPs units. This class will be removed in a future release.
*/
@Deprecated
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TableConstants {
public static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
public static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
}

View file

@ -50,6 +50,8 @@ public class LeaseCoordinatorExerciser {
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final MetricsLevel METRICS_LEVEL = MetricsLevel.DETAILED; private static final MetricsLevel METRICS_LEVEL = MetricsLevel.DETAILED;
private static final int FLUSH_SIZE = 200; private static final int FLUSH_SIZE = 200;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException, public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException { ProvisionedThroughputException, IOException {
@ -65,7 +67,8 @@ public class LeaseCoordinatorExerciser {
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient, LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
new DynamoDBLeaseSerializer(), true); new DynamoDBLeaseSerializer(), true);
if (leaseRefresher.createLeaseTableIfNotExists(10L, 50L)) { if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY)) {
log.info("Waiting for newly created lease table"); log.info("Waiting for newly created lease table");
if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) { if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) {
log.error("Table was not created in time"); log.error("Table was not created in time");
@ -83,7 +86,8 @@ public class LeaseCoordinatorExerciser {
LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis, LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis,
epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME,
MAX_LEASE_RENEWER_THREAD_COUNT, metricsFactory); MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
coordinators.add(coord); coordinators.add(coord);
} }

View file

@ -56,6 +56,9 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static DynamoDBLeaseRefresher leaseRefresher; private static DynamoDBLeaseRefresher leaseRefresher;
private static DynamoDBCheckpointer dynamoDBCheckpointer; private static DynamoDBCheckpointer dynamoDBCheckpointer;
@ -93,7 +96,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
leaseRefresher.deleteAll(); leaseRefresher.deleteAll();
coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
metricsFactory); INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION); dynamoDBCheckpointer.operation(OPERATION);