Added IKinesisProxy injector in Worker.Builder to allow injecting custom proxy implementations

This commit is contained in:
parijas 2018-01-04 17:39:28 -08:00
parent 71b3e9df13
commit 35e32d42a2

View file

@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1073,6 +1074,7 @@ public class Worker implements Runnable {
private IMetricsFactory metricsFactory;
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy;
/**
* Default constructor.
@ -1192,6 +1194,19 @@ public class Worker implements Runnable {
return this;
}
/**
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
this.kinesisProxy = kinesisProxy;
return this;
}
/**
* Build the Worker instance.
*
@ -1257,13 +1272,15 @@ public class Worker implements Runnable {
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}
if (kinesisProxy == null) {
kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName());
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
new StreamConfig(kinesisProxy,
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),