diff --git a/pom.xml b/pom.xml
index 2e4488be..2002cb43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.9.1
+ 1.9.2-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 4a03b449..e9e1b5ed 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -31,18 +31,27 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.regions.Region;
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
@@ -60,6 +69,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.NonNull;
import lombok.Setter;
import lombok.experimental.Accessors;
@@ -399,28 +409,16 @@ public class Worker implements Runnable {
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
- Region region = RegionUtils.getRegion(config.getRegionName());
- kinesisClient.setRegion(region);
- LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
- dynamoDBClient.setRegion(region);
- LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
+ setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
+ setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
}
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
- dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
- LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
+ setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
- kinesisClient.setEndpoint(config.getKinesisEndpoint());
- if (config.getRegionName() != null) {
- LOG.warn("Received configuration for both region name as " + config.getRegionName()
- + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
- + ". Amazon Kinesis endpoint will overwrite region name.");
- LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
- } else {
- LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
- }
+ setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
}
}
@@ -1080,9 +1078,7 @@ public class Worker implements Runnable {
metricsFactory = new NullMetricsFactory();
} else {
if (config.getRegionName() != null) {
- Region region = RegionUtils.getRegion(config.getRegionName());
- cloudWatchClient.setRegion(region);
- LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
+ setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
}
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
@@ -1101,6 +1097,15 @@ public class Worker implements Runnable {
return new WorkerThreadPoolExecutor(threadFactory);
}
+ private static void setField(final S source, final String field, final Consumer t, T value) {
+ try {
+ t.accept(value);
+ } catch (UnsupportedOperationException e) {
+ LOG.debug("Exception thrown while trying to set " + field + ", indicating that "
+ + source.getClass().getSimpleName() + "is immutable.", e);
+ }
+ }
+
/**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* Visible and non-final only for testing.
@@ -1154,6 +1159,21 @@ public class Worker implements Runnable {
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
+ @VisibleForTesting
+ AmazonKinesis getKinesisClient() {
+ return kinesisClient;
+ }
+
+ @VisibleForTesting
+ AmazonDynamoDB getDynamoDBClient() {
+ return dynamoDBClient;
+ }
+
+ @VisibleForTesting
+ AmazonCloudWatch getCloudWatchClient() {
+ return cloudWatchClient;
+ }
+
/**
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* IRecordProcessor}.
@@ -1201,44 +1221,39 @@ public class Worker implements Runnable {
execService = getExecutorService();
}
if (kinesisClient == null) {
- kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
- config.getKinesisClientConfiguration());
+ kinesisClient = createClient(AmazonKinesisClientBuilder.standard(),
+ config.getKinesisCredentialsProvider(),
+ config.getKinesisClientConfiguration(),
+ config.getKinesisEndpoint(),
+ config.getRegionName());
}
if (dynamoDBClient == null) {
- dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
- config.getDynamoDBClientConfiguration());
+ dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(),
+ config.getDynamoDBCredentialsProvider(),
+ config.getDynamoDBClientConfiguration(),
+ config.getDynamoDBEndpoint(),
+ config.getRegionName());
}
if (cloudWatchClient == null) {
- cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
- config.getCloudWatchClientConfiguration());
+ cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(),
+ config.getCloudWatchCredentialsProvider(),
+ config.getCloudWatchClientConfiguration(),
+ null,
+ config.getRegionName());
}
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
- Region region = RegionUtils.getRegion(config.getRegionName());
- cloudWatchClient.setRegion(region);
- LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
- kinesisClient.setRegion(region);
- LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
- dynamoDBClient.setRegion(region);
- LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
+ setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
+ setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
+ setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
}
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
- dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
- LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
+ setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
- kinesisClient.setEndpoint(config.getKinesisEndpoint());
- if (config.getRegionName() != null) {
- LOG.warn("Received configuration for both region name as " + config.getRegionName()
- + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
- + ". Amazon Kinesis endpoint will overwrite region name.");
- LOG.debug("The region of Amazon Kinesis client has been overwritten to "
- + config.getKinesisEndpoint());
- } else {
- LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
- }
+ setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
}
if (metricsFactory == null) {
metricsFactory = getMetricsFactory(cloudWatchClient, config);
@@ -1292,5 +1307,29 @@ public class Worker implements Runnable {
workerStateChangeListener);
}
+ > R createClient(final T builder,
+ final AWSCredentialsProvider credentialsProvider,
+ final ClientConfiguration clientConfiguration,
+ final String endpointUrl,
+ final String region) {
+ if (credentialsProvider != null) {
+ builder.withCredentials(credentialsProvider);
+ }
+ if (clientConfiguration != null) {
+ builder.withClientConfiguration(clientConfiguration);
+ }
+ if (StringUtils.isNotEmpty(endpointUrl)) {
+ LOG.warn("Received configuration for endpoint as " + endpointUrl + ", and region as "
+ + region + ".");
+ builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region));
+ } else if (StringUtils.isNotEmpty(region)) {
+ LOG.warn("Received configuration for region as " + region + ".");
+ builder.withRegion(region);
+ } else {
+ LOG.warn("No configuration received for endpoint and region, will default region to us-east-1");
+ builder.withRegion(Regions.US_EAST_1);
+ }
+ return builder.build();
+ }
}
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
index cccbcb30..d30c9e57 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java
@@ -268,8 +268,8 @@ public class KinesisClientLibConfigurationTest {
IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class);
new Worker(processorFactory, kclConfig);
- Mockito.verify(kclConfig, Mockito.times(9)).getRegionName();
- Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint();
+ Mockito.verify(kclConfig, Mockito.times(5)).getRegionName();
+ Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
kclConfig = Mockito.spy(
new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0")
@@ -277,8 +277,8 @@ public class KinesisClientLibConfigurationTest {
new Worker(processorFactory, kclConfig);
- Mockito.verify(kclConfig, Mockito.times(3)).getRegionName();
- Mockito.verify(kclConfig, Mockito.times(3)).getKinesisEndpoint();
+ Mockito.verify(kclConfig, Mockito.times(2)).getRegionName();
+ Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
}
@Test
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
index 21aaa8ac..ddc39aed 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java
@@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@@ -75,14 +77,24 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
@@ -92,8 +104,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMe
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
-import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
+import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
@@ -1630,6 +1642,124 @@ public class WorkerTest {
Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager());
}
+ @Test
+ public void testBuilderSetRegionAndEndpointToClient() {
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ final String endpoint = "TestEndpoint";
+ KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
+ .withRegionName(Regions.US_WEST_2.getName())
+ .withKinesisEndpoint(endpoint)
+ .withDynamoDBEndpoint(endpoint);
+
+ AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+ AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+ AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+
+ new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)
+ .kinesisClient(kinesisClient)
+ .dynamoDBClient(dynamoDBClient)
+ .cloudWatchClient(cloudWatchClient)
+ .build();
+
+ verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+ verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+ verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+
+ verify(kinesisClient, times(1)).setEndpoint(eq(endpoint));
+ verify(dynamoDBClient, times(1)).setEndpoint(eq(endpoint));
+ verify(cloudWatchClient, never()).setEndpoint(anyString());
+ }
+
+ @Test
+ public void testBuilderSetRegionToClient() {
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ String region = Regions.US_WEST_2.getName();
+ KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
+ .withRegionName(region);
+
+ Worker.Builder builder = new Worker.Builder();
+
+ AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+ AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+ AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
+
+ builder.recordProcessorFactory(recordProcessorFactory).config(config)
+ .kinesisClient(kinesisClient)
+ .dynamoDBClient(dynamoDBClient)
+ .cloudWatchClient(cloudWatchClient)
+ .build();
+
+ verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+ verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+ verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
+
+ verify(kinesisClient, never()).setEndpoint(any());
+ verify(dynamoDBClient, never()).setEndpoint(any());
+ verify(cloudWatchClient, never()).setEndpoint(any());
+ }
+
+ @Test
+ public void testBuilderGenerateClients() {
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null);
+ Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config));
+ ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
+
+ assertNull(builder.getKinesisClient());
+ assertNull(builder.getDynamoDBClient());
+ assertNull(builder.getCloudWatchClient());
+
+ builder.build();
+
+ assertTrue(builder.getKinesisClient() instanceof AmazonKinesis);
+ assertTrue(builder.getDynamoDBClient() instanceof AmazonDynamoDB);
+ assertTrue(builder.getCloudWatchClient() instanceof AmazonCloudWatch);
+
+ verify(builder, times(3)).createClient(
+ builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(null));
+
+ builderCaptor.getAllValues().forEach(clientBuilder -> {
+ assertTrue(clientBuilder.getRegion().equals(Regions.US_EAST_1.getName()));
+ });
+ }
+
+ @Test
+ public void testBuilderGenerateClientsWithRegion() {
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ String region = Regions.US_WEST_2.getName();
+ KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
+ .withRegionName(region);
+ ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
+
+ Worker.Builder builder = spy(new Worker.Builder());
+
+ builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
+
+ verify(builder, times(3)).createClient(
+ builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
+ builderCaptor.getAllValues().forEach(clientBuilder -> {
+ assertTrue(clientBuilder.getRegion().equals(region));
+ });
+ }
+
+ @Test
+ public void testBuilderGenerateClientsWithEndpoint() {
+ IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
+ String region = Regions.US_WEST_2.getName();
+ String endpointUrl = "TestEndpoint";
+ KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
+ .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
+
+ Worker.Builder builder = spy(new Worker.Builder());
+
+ builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
+
+ verify(builder, times(2)).createClient(
+ any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region));
+ verify(builder, times(1)).createClient(
+ any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
+ }
+
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,