Fix for Worker to handle immutable clients (#305)

This commit is contained in:
Sahil Palvia 2018-05-04 10:06:13 -07:00 committed by Justin Pfifer
parent 5d183e0197
commit 86cc068454
4 changed files with 220 additions and 51 deletions

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.9.1</version> <version>1.9.2-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>

View file

@ -31,18 +31,27 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.regions.Region; import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
@ -60,6 +69,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
@ -399,28 +409,16 @@ public class Worker implements Runnable {
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName()); setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
kinesisClient.setRegion(region); setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
} }
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) { if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
} }
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) { if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint()); setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
if (config.getRegionName() != null) {
LOG.warn("Received configuration for both region name as " + config.getRegionName()
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
+ ". Amazon Kinesis endpoint will overwrite region name.");
LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
} else {
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
}
} }
} }
@ -1080,9 +1078,7 @@ public class Worker implements Runnable {
metricsFactory = new NullMetricsFactory(); metricsFactory = new NullMetricsFactory();
} else { } else {
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName()); setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
cloudWatchClient.setRegion(region);
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
} }
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
@ -1101,6 +1097,15 @@ public class Worker implements Runnable {
return new WorkerThreadPoolExecutor(threadFactory); return new WorkerThreadPoolExecutor(threadFactory);
} }
private static <S, T> void setField(final S source, final String field, final Consumer<T> t, T value) {
try {
t.accept(value);
} catch (UnsupportedOperationException e) {
LOG.debug("Exception thrown while trying to set " + field + ", indicating that "
+ source.getClass().getSimpleName() + "is immutable.", e);
}
}
/** /**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
@ -1154,6 +1159,21 @@ public class Worker implements Runnable {
@Setter @Accessors(fluent = true) @Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener; private WorkerStateChangeListener workerStateChangeListener;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
return kinesisClient;
}
@VisibleForTesting
AmazonDynamoDB getDynamoDBClient() {
return dynamoDBClient;
}
@VisibleForTesting
AmazonCloudWatch getCloudWatchClient() {
return cloudWatchClient;
}
/** /**
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* IRecordProcessor}. * IRecordProcessor}.
@ -1201,44 +1221,39 @@ public class Worker implements Runnable {
execService = getExecutorService(); execService = getExecutorService();
} }
if (kinesisClient == null) { if (kinesisClient == null) {
kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(), kinesisClient = createClient(AmazonKinesisClientBuilder.standard(),
config.getKinesisClientConfiguration()); config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration(),
config.getKinesisEndpoint(),
config.getRegionName());
} }
if (dynamoDBClient == null) { if (dynamoDBClient == null) {
dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(),
config.getDynamoDBClientConfiguration()); config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration(),
config.getDynamoDBEndpoint(),
config.getRegionName());
} }
if (cloudWatchClient == null) { if (cloudWatchClient == null) {
cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(),
config.getCloudWatchClientConfiguration()); config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration(),
null,
config.getRegionName());
} }
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName()); setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
cloudWatchClient.setRegion(region); setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
kinesisClient.setRegion(region);
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
} }
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) { if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
} }
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) { if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint()); setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
if (config.getRegionName() != null) {
LOG.warn("Received configuration for both region name as " + config.getRegionName()
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
+ ". Amazon Kinesis endpoint will overwrite region name.");
LOG.debug("The region of Amazon Kinesis client has been overwritten to "
+ config.getKinesisEndpoint());
} else {
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
}
} }
if (metricsFactory == null) { if (metricsFactory == null) {
metricsFactory = getMetricsFactory(cloudWatchClient, config); metricsFactory = getMetricsFactory(cloudWatchClient, config);
@ -1292,5 +1307,29 @@ public class Worker implements Runnable {
workerStateChangeListener); workerStateChangeListener);
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
final AWSCredentialsProvider credentialsProvider,
final ClientConfiguration clientConfiguration,
final String endpointUrl,
final String region) {
if (credentialsProvider != null) {
builder.withCredentials(credentialsProvider);
}
if (clientConfiguration != null) {
builder.withClientConfiguration(clientConfiguration);
}
if (StringUtils.isNotEmpty(endpointUrl)) {
LOG.warn("Received configuration for endpoint as " + endpointUrl + ", and region as "
+ region + ".");
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region));
} else if (StringUtils.isNotEmpty(region)) {
LOG.warn("Received configuration for region as " + region + ".");
builder.withRegion(region);
} else {
LOG.warn("No configuration received for endpoint and region, will default region to us-east-1");
builder.withRegion(Regions.US_EAST_1);
}
return builder.build();
}
} }
} }

View file

@ -268,8 +268,8 @@ public class KinesisClientLibConfigurationTest {
IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class); IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class);
new Worker(processorFactory, kclConfig); new Worker(processorFactory, kclConfig);
Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); Mockito.verify(kclConfig, Mockito.times(5)).getRegionName();
Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
kclConfig = Mockito.spy( kclConfig = Mockito.spy(
new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0")
@ -277,8 +277,8 @@ public class KinesisClientLibConfigurationTest {
new Worker(processorFactory, kclConfig); new Worker(processorFactory, kclConfig);
Mockito.verify(kclConfig, Mockito.times(3)).getRegionName(); Mockito.verify(kclConfig, Mockito.times(2)).getRegionName();
Mockito.verify(kclConfig, Mockito.times(3)).getKinesisEndpoint(); Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
} }
@Test @Test

View file

@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
@ -75,14 +77,24 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
@ -92,8 +104,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMe
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
@ -1630,6 +1642,124 @@ public class WorkerTest {
Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager()); Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager());
} }
@Test
public void testBuilderSetRegionAndEndpointToClient() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final String endpoint = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
.withRegionName(Regions.US_WEST_2.getName())
.withKinesisEndpoint(endpoint)
.withDynamoDBEndpoint(endpoint);
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(cloudWatchClient)
.build();
verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(kinesisClient, times(1)).setEndpoint(eq(endpoint));
verify(dynamoDBClient, times(1)).setEndpoint(eq(endpoint));
verify(cloudWatchClient, never()).setEndpoint(anyString());
}
@Test
public void testBuilderSetRegionToClient() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
.withRegionName(region);
Worker.Builder builder = new Worker.Builder();
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build());
builder.recordProcessorFactory(recordProcessorFactory).config(config)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(cloudWatchClient)
.build();
verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
verify(kinesisClient, never()).setEndpoint(any());
verify(dynamoDBClient, never()).setEndpoint(any());
verify(cloudWatchClient, never()).setEndpoint(any());
}
@Test
public void testBuilderGenerateClients() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null);
Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config));
ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
assertNull(builder.getKinesisClient());
assertNull(builder.getDynamoDBClient());
assertNull(builder.getCloudWatchClient());
builder.build();
assertTrue(builder.getKinesisClient() instanceof AmazonKinesis);
assertTrue(builder.getDynamoDBClient() instanceof AmazonDynamoDB);
assertTrue(builder.getCloudWatchClient() instanceof AmazonCloudWatch);
verify(builder, times(3)).createClient(
builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(null));
builderCaptor.getAllValues().forEach(clientBuilder -> {
assertTrue(clientBuilder.getRegion().equals(Regions.US_EAST_1.getName()));
});
}
@Test
public void testBuilderGenerateClientsWithRegion() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName();
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
.withRegionName(region);
ArgumentCaptor<AwsClientBuilder> builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class);
Worker.Builder builder = spy(new Worker.Builder());
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
verify(builder, times(3)).createClient(
builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
builderCaptor.getAllValues().forEach(clientBuilder -> {
assertTrue(clientBuilder.getRegion().equals(region));
});
}
@Test
public void testBuilderGenerateClientsWithEndpoint() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
String region = Regions.US_WEST_2.getName();
String endpointUrl = "TestEndpoint";
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
Worker.Builder builder = spy(new Worker.Builder());
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
verify(builder, times(2)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region));
verify(builder, times(1)).createClient(
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
}
private abstract class InjectableWorker extends Worker { private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig, KinesisClientLibConfiguration config, StreamConfig streamConfig,