diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 1ce25579..8f4140a1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import lombok.Getter; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; @@ -1063,6 +1064,7 @@ public class Worker implements Runnable { /** * Builder to construct a Worker instance. */ + @Getter public static class Builder { private IRecordProcessorFactory recordProcessorFactory; @@ -1267,7 +1269,7 @@ public class Worker implements Runnable { } } if (metricsFactory == null) { - metricsFactory = getMetricsFactory(cloudWatchClient, config); + metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config); } if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index ce406dce..ae3b883a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -41,6 +43,7 @@ import java.io.File; import java.lang.Thread.State; import java.lang.reflect.Field; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -108,6 +111,13 @@ import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.InvalidArgumentException; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -1474,6 +1484,71 @@ public class WorkerTest { } + @Test + public void testBuilderWithDefaultKinesisProxy() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + + Worker.Builder builder = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config); + builder.build(); + assertNotNull(builder.getKinesisProxy()); + assertTrue(builder.getKinesisProxy() instanceof KinesisProxy); + } + + @Test + public void testBuilderWhenKinesisProxyIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = new DummyKinesisProxy(); + Worker.Builder builder = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .kinesisProxy(kinesisProxy) + .config(config); + builder.build(); + assertNotNull(builder.getKinesisProxy()); + assertTrue(builder.getKinesisProxy() instanceof DummyKinesisProxy); + } + + // KinesisProxyImplementation to test KinesisProxy injection. + private class DummyKinesisProxy implements IKinesisProxy { + @Override public GetRecordsResult get(String shardIterator, int maxRecords) + throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { + return null; + } + + @Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException { + return null; + } + + @Override public Set getAllShardIds() throws ResourceNotFoundException { + return null; + } + + @Override public List getShardList() throws ResourceNotFoundException { + return null; + } + + @Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public String getIterator(String shardId, String iteratorEnum) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public String getIterator(String shardId, Date timestamp) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey, + String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,