Fixing IOPs issue with lease table

* Making DynamoDBLeaseCoordinator take IOPs configuration in the constructor
* InitialLeaseTableReadCapacity and InitialLeaseTableWriteCapacity for the DynamoDBLeaseCoordinator class throws UnsupportedException
This commit is contained in:
Sahil Palvia 2018-08-09 15:14:40 -07:00
parent 0b267037ea
commit 51ec96bf9a
5 changed files with 55 additions and 30 deletions

View file

@ -21,15 +21,15 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -216,7 +216,9 @@ public class LeaseManagementConfig {
maxListShardsRetryAttempts(),
maxCacheMissesBeforeReload(),
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus());
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity());
}
return leaseManagementFactory;
}

View file

@ -44,8 +44,8 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
/**
@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil;
public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
// Time to wait for in-flight Runnables to finish when calling .stop();
private static final long STOP_WAIT_TIME_MILLIS = 2000L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
@ -72,11 +70,12 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
private final long takerIntervalMillis;
private final ExecutorService leaseRenewalThreadpool;
private final LeaseRefresher leaseRefresher;
private final Object shutdownLock = new Object();
private final long initialLeaseTableReadCapacity;
private final long initialLeaseTableWriteCapacity;
protected final MetricsFactory metricsFactory;
private long initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
private long initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
private final Object shutdownLock = new Object();
private ScheduledExecutorService leaseCoordinatorThreadPool;
private ScheduledFuture<?> takerFuture;
@ -85,13 +84,24 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
/**
* Constructor.
*
* @param leaseRefresher LeaseRefresher instance to use
* @param workerIdentifier Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime Steal up to these many leases at a time (for load balancing)
* @param metricsFactory Used to publish metrics about lease operations
* @param leaseRefresher
* LeaseRefresher instance to use
* @param workerIdentifier
* Identifies the worker (e.g. useful to track lease ownership)
* @param leaseDurationMillis
* Duration of a lease
* @param epsilonMillis
* Allow for some variance when calculating lease expirations
* @param maxLeasesForWorker
* Max leases this Worker can handle at a time
* @param maxLeasesToStealAtOneTime
* Steal up to these many leases at a time (for load balancing)
* @param initialLeaseTableReadCapacity
* Initial dynamodb lease table read iops if creating the lease table
* @param initialLeaseTableWriteCapacity
* Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory
* Used to publish metrics about lease operations
*/
public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
@ -100,6 +110,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
@ -110,6 +122,14 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = leaseDurationMillis / 3 - epsilonMillis;
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
if (initialLeaseTableWriteCapacity <= 0) {
throw new IllegalArgumentException("writeCapacity should be >= 1");
}
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.metricsFactory = metricsFactory;
log.info("With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take"
@ -324,19 +344,11 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
@Override
public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity) {
if (readCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
this.initialLeaseTableReadCapacity = readCapacity;
return this;
throw new UnsupportedOperationException("Please set read capacity using the constructor");
}
@Override
public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacity) {
if (writeCapacity <= 0) {
throw new IllegalArgumentException("writeCapacity should be >= 1");
}
this.initialLeaseTableWriteCapacity = writeCapacity;
return this;
throw new UnsupportedOperationException("Please set write capacity using the constructor");
}
}

View file

@ -64,6 +64,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final int maxCacheMissesBeforeReload;
private final long listShardsCacheAllowedAgeInSeconds;
private final int cacheMissWarningModulus;
private final long initialLeaseTableReadCapacity;
private final long initialLeaseTableWriteCapacity;
@Override
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
@ -74,6 +76,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
metricsFactory);
}

View file

@ -50,6 +50,8 @@ public class LeaseCoordinatorExerciser {
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final MetricsLevel METRICS_LEVEL = MetricsLevel.DETAILED;
private static final int FLUSH_SIZE = 200;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 50L;
public static void main(String[] args) throws InterruptedException, DependencyException, InvalidStateException,
ProvisionedThroughputException, IOException {
@ -65,7 +67,8 @@ public class LeaseCoordinatorExerciser {
LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher("nagl_ShardProgress", dynamoDBClient,
new DynamoDBLeaseSerializer(), true);
if (leaseRefresher.createLeaseTableIfNotExists(10L, 50L)) {
if (leaseRefresher.createLeaseTableIfNotExists(INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY)) {
log.info("Waiting for newly created lease table");
if (!leaseRefresher.waitUntilLeaseTableExists(10, 300)) {
log.error("Table was not created in time");
@ -83,7 +86,8 @@ public class LeaseCoordinatorExerciser {
LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis,
epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME,
MAX_LEASE_RENEWER_THREAD_COUNT, metricsFactory);
MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
coordinators.add(coord);
}

View file

@ -56,6 +56,9 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static DynamoDBLeaseRefresher leaseRefresher;
private static DynamoDBCheckpointer dynamoDBCheckpointer;
@ -93,7 +96,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
leaseRefresher.deleteAll();
coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
metricsFactory);
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION);