This commit is contained in:
William Johnson 2018-01-05 22:09:07 +00:00 committed by GitHub
commit 7cbcf3e160

View file

@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1071,6 +1072,7 @@ public class Worker implements Runnable {
private AmazonDynamoDB dynamoDBClient;
private AmazonCloudWatch cloudWatchClient;
private IMetricsFactory metricsFactory;
private ILeaseManager<KinesisClientLease> leaseManager;
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
@ -1167,6 +1169,18 @@ public class Worker implements Runnable {
return this;
}
/**
* Set the lease manager.
*
* @param leaseManager
* Lease manager used to manage shard leases
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder leaseManager(ILeaseManager<KinesisClientLease> leaseManager) {
this.leaseManager = leaseManager;
return this;
}
/**
* Set the executor service for processing records.
*
@ -1254,6 +1268,9 @@ public class Worker implements Runnable {
if (metricsFactory == null) {
metricsFactory = getMetricsFactory(cloudWatchClient, config);
}
if (leaseManager == null) {
leaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
}
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}
@ -1274,8 +1291,7 @@ public class Worker implements Runnable {
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
dynamoDBClient),
new KinesisClientLibLeaseCoordinator(leaseManager,
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),