From 690d214f48f3becc43ad0d219d091694ede4f94f Mon Sep 17 00:00:00 2001 From: William Johnson Date: Wed, 18 Oct 2017 18:06:51 -0700 Subject: [PATCH] Allow specifying a custom LeaseManager in Worker.Builder --- .../clientlibrary/lib/worker/Worker.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..3c764d17 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1054,6 +1055,7 @@ public class Worker implements Runnable { private AmazonDynamoDB dynamoDBClient; private AmazonCloudWatch cloudWatchClient; private IMetricsFactory metricsFactory; + private ILeaseManager leaseManager; private ExecutorService execService; private ShardPrioritization shardPrioritization; @@ -1150,6 +1152,18 @@ public class Worker implements Runnable { return this; } + /** + * Set the lease manager. + * + * @param leaseManager + * Lease manager used to manage shard leases + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder leaseManager(ILeaseManager leaseManager) { + this.leaseManager = leaseManager; + return this; + } + /** * Set the executor service for processing records. * @@ -1237,6 +1251,9 @@ public class Worker implements Runnable { if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); } + if (leaseManager == null) { + leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + } if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } @@ -1256,8 +1273,7 @@ public class Worker implements Runnable { config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), - dynamoDBClient), + new KinesisClientLibLeaseCoordinator(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),