From f1cbf15075fd410dc7fc203af3a1cd6f2b562b4d Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 14:57:58 -0700 Subject: [PATCH] Introducing changes to avoid breaking changes * Introducing chained constructors in DynamoDBLeaseManagementFactory * Introducing TableConstants to maintain Default IOPS in one place --- .../dynamodb/DynamoDBLeaseCoordinator.java | 7 +- .../DynamoDBLeaseManagementFactory.java | 108 ++++++++++++++++++ .../leases/dynamodb/TableConstants.java | 29 +++++ 3 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index be484a73..12ca3a01 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -59,8 +59,6 @@ import software.amazon.kinesis.metrics.MetricsUtil; public class DynamoDBLeaseCoordinator implements LeaseCoordinator { // Time to wait for in-flight Runnables to finish when calling .stop(); private static final long STOP_WAIT_TIME_MILLIS = 2000L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; - private static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; private static final ThreadFactory LEASE_COORDINATOR_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("LeaseCoordinator-%04d").setDaemon(true).build(); private static final ThreadFactory LEASE_RENEWAL_THREAD_FACTORY = new ThreadFactoryBuilder() @@ -113,8 +111,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final int maxLeaseRenewerThreadCount, final MetricsFactory metricsFactory) { this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, - maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, - DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); + maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 41bf2d27..3c20c591 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -67,6 +67,114 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long initialLeaseTableReadCapacity; private final long initialLeaseTableWriteCapacity; + /** + * Constructor. + * + *

NOTE: This constructor is deprecated and will be removed in a future release.

+ * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + */ + @Deprecated + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) { + this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, + initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY); + } + + /** + * Constructor. + * + * @param kinesisClient + * @param streamName + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param initialPositionInStream + * @param failoverTimeMillis + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + */ + public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, + final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, + final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, + final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { + this.kinesisClient = kinesisClient; + this.streamName = streamName; + this.dynamoDBClient = dynamoDBClient; + this.tableName = tableName; + this.workerIdentifier = workerIdentifier; + this.executorService = executorService; + this.initialPositionInStream = initialPositionInStream; + this.failoverTimeMillis = failoverTimeMillis; + this.epsilonMillis = epsilonMillis; + this.maxLeasesForWorker = maxLeasesForWorker; + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.consistentReads = consistentReads; + this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + } + @Override public LeaseCoordinator createLeaseCoordinator(@NonNull final MetricsFactory metricsFactory) { return new DynamoDBLeaseCoordinator(this.createLeaseRefresher(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java new file mode 100644 index 00000000..3848c2f0 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/TableConstants.java @@ -0,0 +1,29 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.leases.dynamodb; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * This class is just a holder for initial lease table IOPs units. This class will be removed in a future release. + */ +@Deprecated +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class TableConstants { + public static final long DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; + public static final long DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; +}