diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index c0f413be..1ce25579 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -1073,6 +1074,7 @@ public class Worker implements Runnable { private IMetricsFactory metricsFactory; private ExecutorService execService; private ShardPrioritization shardPrioritization; + private IKinesisProxy kinesisProxy; /** * Default constructor. @@ -1192,6 +1194,19 @@ public class Worker implements Runnable { return this; } + /** + * Set KinesisProxy for the worker. + * + * @param kinesisProxy + * KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams + * + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder kinesisProxy(IKinesisProxy kinesisProxy) { + this.kinesisProxy = kinesisProxy; + return this; + } + /** * Build the Worker instance. * @@ -1257,13 +1272,15 @@ public class Worker implements Runnable { if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } - + if (kinesisProxy == null) { + kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) + .getProxy(config.getStreamName()); + } return new Worker(config.getApplicationName(), recordProcessorFactory, config, - new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), - kinesisClient).getProxy(config.getStreamName()), + new StreamConfig(kinesisProxy, config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),