diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f5227bbf..66da9218 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import lombok.Setter; import lombok.experimental.Accessors; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -540,6 +542,13 @@ public class Worker implements Runnable { return applicationName; } + /** + * @return the leaseCoordinator + */ + KinesisClientLibLeaseCoordinator getLeaseCoordinator(){ + return leaseCoordinator; + } + /** * Start consuming data from the stream, and pass it to the application record processors. */ @@ -1122,6 +1131,7 @@ public class Worker implements Runnable { private AmazonCloudWatch cloudWatchClient; @Setter @Accessors(fluent = true) private IMetricsFactory metricsFactory; + private ILeaseManager leaseManager; @Setter @Accessors(fluent = true) private ExecutorService execService; @Setter @Accessors(fluent = true) @@ -1218,6 +1228,9 @@ public class Worker implements Runnable { if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); } + if (leaseManager == null) { + leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + } if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } @@ -1239,8 +1252,7 @@ public class Worker implements Runnable { config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), - dynamoDBClient), + new KinesisClientLibLeaseCoordinator(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 6cc7ef08..037a54b2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1500,6 +1500,33 @@ public class WorkerTest { Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); } + @Test + public void testBuilderWithDefaultLeaseManager() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + + Assert.assertNotNull(worker.getLeaseCoordinator().getLeaseManager()); + } + + @SuppressWarnings("unchecked") + @Test + public void testBuilderWhenLeaseManagerIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + // Create an instance of ILeaseManager for injection and validation + ILeaseManager leaseManager = (ILeaseManager) mock(ILeaseManager.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .leaseManager(leaseManager) + .build(); + + Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager()); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,