From 2bf45eafdabf5d238649d306970aaf1f75a5f730 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 21 Jul 2016 06:58:42 -0700 Subject: [PATCH] Merge Changes for PR#61 Merge, and slight adjustment, of https://github.com/awslabs/amazon-kinesis-client/pull/61. This allows configuration to explicitly set the name of the lease table. The table name defaults to the application name which is the current behavior. --- .../worker/KinesisClientLibConfiguration.java | 18 ++++++++++++++++++ .../clientlibrary/lib/worker/Worker.java | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index c35f0cd5..133677c3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -155,6 +155,7 @@ public class KinesisClientLibConfiguration { private String applicationName; + private String tableName; private String streamName; private String kinesisEndpoint; private InitialPositionInStream initialPositionInStream; @@ -300,6 +301,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsRegionNameValid(regionName); this.applicationName = applicationName; + this.tableName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; this.initialPositionInStream = initialPositionInStream; @@ -366,6 +368,13 @@ public class KinesisClientLibConfiguration { return applicationName; } + /** + * @return Name of the table to use in DynamoDB + */ + public String getTableName() { + return tableName; + } + /** * @return Time within which a worker should renew a lease (else it is assumed dead) */ @@ -572,6 +581,15 @@ public class KinesisClientLibConfiguration { } // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES + /** + * @param tableName name of the lease table in DynamoDB + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withTableName(String tableName) { + this.tableName = tableName; + return this; + } + /** * @param kinesisEndpoint Kinesis endpoint * @return KinesisClientLibConfiguration diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index dbbf934d..6aec2346 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -219,7 +219,7 @@ public class Worker implements Runnable { config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( - new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), + new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), @@ -952,7 +952,7 @@ public class Worker implements Runnable { config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(), + new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), config.getFailoverTimeMillis(),