diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 1ce25579..429236bd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,6 +14,33 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; +import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; +import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Getter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -32,33 +59,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.amazonaws.regions.Region; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; -import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; -import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; -import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from @@ -1063,6 +1063,7 @@ public class Worker implements Runnable { /** * Builder to construct a Worker instance. */ + @Getter public static class Builder { private IRecordProcessorFactory recordProcessorFactory; @@ -1198,7 +1199,7 @@ public class Worker implements Runnable { * Set KinesisProxy for the worker. * * @param kinesisProxy - * KinesisProxy uses the AmazonKinesis client to get data from Kinesis or DynamoDBStreams + * Sets an implementation of IKinesisProxy. * * @return A reference to this updated object so that method calls can be chained together. */ @@ -1267,7 +1268,7 @@ public class Worker implements Runnable { } } if (metricsFactory == null) { - metricsFactory = getMetricsFactory(cloudWatchClient, config); + metricsFactory = Worker.getMetricsFactory(cloudWatchClient, config); } if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index ce406dce..692e680d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -41,6 +43,7 @@ import java.io.File; import java.lang.Thread.State; import java.lang.reflect.Field; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -63,6 +66,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.InvalidArgumentException; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Condition; @@ -1474,6 +1484,70 @@ public class WorkerTest { } + @Test + public void testBuilderWithDefaultKinesisProxy() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + + Worker.Builder builder = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config); + builder.build(); + assertNotNull(builder.getKinesisProxy()); + assertTrue(builder.getKinesisProxy() instanceof KinesisProxy); + } + + @Test + public void testBuilderWhenKinesisProxyIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + IKinesisProxy kinesisProxy = new MockKinesisProxy(); + Worker.Builder builder = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .kinesisProxy(kinesisProxy) + .config(config); + builder.build(); + assertNotNull(builder.getKinesisProxy()); + assertTrue(builder.getKinesisProxy() instanceof MockKinesisProxy); + } + + private class MockKinesisProxy implements IKinesisProxy { + @Override public GetRecordsResult get(String shardIterator, int maxRecords) + throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException { + return null; + } + + @Override public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException { + return null; + } + + @Override public Set getAllShardIds() throws ResourceNotFoundException { + return null; + } + + @Override public List getShardList() throws ResourceNotFoundException { + return null; + } + + @Override public String getIterator(String shardId, String iteratorEnum, String sequenceNumber) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public String getIterator(String shardId, String iteratorEnum) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public String getIterator(String shardId, Date timestamp) + throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + + @Override public PutRecordResult put(String sequenceNumberForOrdering, String explicitHashKey, + String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException { + return null; + } + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,