Introducing changes to avoid breaking changes
* Introducing chained constructors in DynamoDBLeaseManagementFactory * Introducing TableConstants to maintain Default IOPS in one place
This commit is contained in:
parent
ca88ee9bc6
commit
f1cbf15075
3 changed files with 140 additions and 4 deletions
|
|
@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil;
|
||||||
public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
// Time to wait for in-flight Runnables to finish when calling .stop();
|
// Time to wait for in-flight Runnables to finish when calling .stop();
|
||||||
private static final long STOP_WAIT_TIME_MILLIS = 2000L;
|
private static final long STOP_WAIT_TIME_MILLIS = 2000L;
|
||||||
private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
|
|
||||||
private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
|
|
||||||
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
|
private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
|
.setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build();
|
||||||
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
|
private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder()
|
||||||
|
|
@ -113,8 +111,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
|
||||||
final int maxLeaseRenewerThreadCount,
|
final int maxLeaseRenewerThreadCount,
|
||||||
final MetricsFactory metricsFactory) {
|
final MetricsFactory metricsFactory) {
|
||||||
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
|
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
|
||||||
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
|
||||||
DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
|
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
||||||
|
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -67,6 +67,114 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
||||||
private final long initialLeaseTableReadCapacity;
|
private final long initialLeaseTableReadCapacity;
|
||||||
private final long initialLeaseTableWriteCapacity;
|
private final long initialLeaseTableWriteCapacity;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
|
||||||
|
*
|
||||||
|
* @param kinesisClient
|
||||||
|
* @param streamName
|
||||||
|
* @param dynamoDBClient
|
||||||
|
* @param tableName
|
||||||
|
* @param workerIdentifier
|
||||||
|
* @param executorService
|
||||||
|
* @param initialPositionInStream
|
||||||
|
* @param failoverTimeMillis
|
||||||
|
* @param epsilonMillis
|
||||||
|
* @param maxLeasesForWorker
|
||||||
|
* @param maxLeasesToStealAtOneTime
|
||||||
|
* @param maxLeaseRenewalThreads
|
||||||
|
* @param cleanupLeasesUponShardCompletion
|
||||||
|
* @param ignoreUnexpectedChildShards
|
||||||
|
* @param shardSyncIntervalMillis
|
||||||
|
* @param consistentReads
|
||||||
|
* @param listShardsBackoffTimeMillis
|
||||||
|
* @param maxListShardsRetryAttempts
|
||||||
|
* @param maxCacheMissesBeforeReload
|
||||||
|
* @param listShardsCacheAllowedAgeInSeconds
|
||||||
|
* @param cacheMissWarningModulus
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
|
||||||
|
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||||
|
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
|
||||||
|
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
|
||||||
|
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
|
||||||
|
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
|
||||||
|
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||||
|
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) {
|
||||||
|
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
|
||||||
|
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
|
||||||
|
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
|
||||||
|
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||||
|
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||||
|
cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
|
||||||
|
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* @param kinesisClient
|
||||||
|
* @param streamName
|
||||||
|
* @param dynamoDBClient
|
||||||
|
* @param tableName
|
||||||
|
* @param workerIdentifier
|
||||||
|
* @param executorService
|
||||||
|
* @param initialPositionInStream
|
||||||
|
* @param failoverTimeMillis
|
||||||
|
* @param epsilonMillis
|
||||||
|
* @param maxLeasesForWorker
|
||||||
|
* @param maxLeasesToStealAtOneTime
|
||||||
|
* @param maxLeaseRenewalThreads
|
||||||
|
* @param cleanupLeasesUponShardCompletion
|
||||||
|
* @param ignoreUnexpectedChildShards
|
||||||
|
* @param shardSyncIntervalMillis
|
||||||
|
* @param consistentReads
|
||||||
|
* @param listShardsBackoffTimeMillis
|
||||||
|
* @param maxListShardsRetryAttempts
|
||||||
|
* @param maxCacheMissesBeforeReload
|
||||||
|
* @param listShardsCacheAllowedAgeInSeconds
|
||||||
|
* @param cacheMissWarningModulus
|
||||||
|
* @param initialLeaseTableReadCapacity
|
||||||
|
* @param initialLeaseTableWriteCapacity
|
||||||
|
*/
|
||||||
|
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
|
||||||
|
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||||
|
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
|
||||||
|
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
|
||||||
|
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
|
||||||
|
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
|
||||||
|
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
|
||||||
|
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||||
|
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
|
||||||
|
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
|
||||||
|
this.kinesisClient = kinesisClient;
|
||||||
|
this.streamName = streamName;
|
||||||
|
this.dynamoDBClient = dynamoDBClient;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.workerIdentifier = workerIdentifier;
|
||||||
|
this.executorService = executorService;
|
||||||
|
this.initialPositionInStream = initialPositionInStream;
|
||||||
|
this.failoverTimeMillis = failoverTimeMillis;
|
||||||
|
this.epsilonMillis = epsilonMillis;
|
||||||
|
this.maxLeasesForWorker = maxLeasesForWorker;
|
||||||
|
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
|
||||||
|
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
|
||||||
|
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||||
|
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
|
||||||
|
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
||||||
|
this.consistentReads = consistentReads;
|
||||||
|
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
|
||||||
|
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
|
||||||
|
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
|
||||||
|
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
|
||||||
|
this.cacheMissWarningModulus = cacheMissWarningModulus;
|
||||||
|
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
|
||||||
|
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
|
public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) {
|
||||||
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
|
return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Licensed under the Amazon Software License (the "License").
|
||||||
|
* You may not use this file except in compliance with the License.
|
||||||
|
* A copy of the License is located at
|
||||||
|
*
|
||||||
|
* http://aws.amazon.com/asl/
|
||||||
|
*
|
||||||
|
* or in the "license" file accompanying this file. This file is distributed
|
||||||
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||||
|
* express or implied. See the License for the specific language governing
|
||||||
|
* permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package software.amazon.kinesis.leases.dynamodb;
|
||||||
|
|
||||||
|
import lombok.AccessLevel;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is just a holder for initial lease table IOPs units. This class will be removed in a future release.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||||
|
public class TableConstants {
|
||||||
|
public static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
|
||||||
|
public static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue