Revert "Adding capability to support Immutable clients by the Worker and Worker.Builder"
This reverts commit 053bc226d0.
This commit is contained in:
parent
12f0b59a64
commit
a99d078bce
3 changed files with 50 additions and 214 deletions
|
|
@ -32,24 +32,17 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.amazonaws.AmazonWebServiceClient;
|
import com.amazonaws.regions.Region;
|
||||||
import com.amazonaws.ClientConfiguration;
|
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
|
||||||
import com.amazonaws.client.builder.AwsClientBuilder;
|
|
||||||
import com.amazonaws.regions.RegionUtils;
|
import com.amazonaws.regions.RegionUtils;
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
|
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
||||||
|
|
@ -67,7 +60,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import lombok.NonNull;
|
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.experimental.Accessors;
|
import lombok.experimental.Accessors;
|
||||||
|
|
||||||
|
|
@ -407,16 +399,28 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient((AmazonWebServiceClient) kinesisClient, config.getRegionName());
|
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||||
setRegionForClient((AmazonWebServiceClient) dynamoDBClient, config.getRegionName());
|
kinesisClient.setRegion(region);
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
|
||||||
|
dynamoDBClient.setRegion(region);
|
||||||
|
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
|
||||||
}
|
}
|
||||||
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
||||||
if (config.getDynamoDBEndpoint() != null) {
|
if (config.getDynamoDBEndpoint() != null) {
|
||||||
setEndpointForClient((AmazonWebServiceClient) dynamoDBClient, config.getDynamoDBEndpoint());
|
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
|
||||||
|
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
|
||||||
}
|
}
|
||||||
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
||||||
if (config.getKinesisEndpoint() != null) {
|
if (config.getKinesisEndpoint() != null) {
|
||||||
setEndpointForClient((AmazonWebServiceClient) kinesisClient, config.getKinesisEndpoint());
|
kinesisClient.setEndpoint(config.getKinesisEndpoint());
|
||||||
|
if (config.getRegionName() != null) {
|
||||||
|
LOG.warn("Received configuration for both region name as " + config.getRegionName()
|
||||||
|
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
|
||||||
|
+ ". Amazon Kinesis endpoint will overwrite region name.");
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
|
||||||
|
} else {
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1076,7 +1080,9 @@ public class Worker implements Runnable {
|
||||||
metricsFactory = new NullMetricsFactory();
|
metricsFactory = new NullMetricsFactory();
|
||||||
} else {
|
} else {
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient((AmazonWebServiceClient) cloudWatchClient, config.getRegionName());
|
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||||
|
cloudWatchClient.setRegion(region);
|
||||||
|
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||||
}
|
}
|
||||||
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
|
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
|
||||||
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
|
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
|
||||||
|
|
@ -1095,24 +1101,6 @@ public class Worker implements Runnable {
|
||||||
return new WorkerThreadPoolExecutor(threadFactory);
|
return new WorkerThreadPoolExecutor(threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setRegionForClient(@NonNull final AmazonWebServiceClient client, @NonNull final String region) {
|
|
||||||
try {
|
|
||||||
client.setRegion(RegionUtils.getRegion(region));
|
|
||||||
LOG.debug("Region set for client to " + region);
|
|
||||||
} catch (UnsupportedOperationException e) {
|
|
||||||
LOG.debug("Trying to set region for immutable client", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setEndpointForClient(@NonNull final AmazonWebServiceClient client, @NonNull final String endpointUrl) {
|
|
||||||
try {
|
|
||||||
client.setEndpoint(endpointUrl);
|
|
||||||
LOG.debug("Endpoint set for client to " + endpointUrl);
|
|
||||||
} catch (UnsupportedOperationException e) {
|
|
||||||
LOG.debug("Trying to set endpoint for immutable client", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
|
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
|
||||||
* Visible and non-final only for testing.
|
* Visible and non-final only for testing.
|
||||||
|
|
@ -1166,21 +1154,6 @@ public class Worker implements Runnable {
|
||||||
@Setter @Accessors(fluent = true)
|
@Setter @Accessors(fluent = true)
|
||||||
private WorkerStateChangeListener workerStateChangeListener;
|
private WorkerStateChangeListener workerStateChangeListener;
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
AmazonKinesis getKinesisClient() {
|
|
||||||
return kinesisClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
AmazonDynamoDB getDynamoDBClient() {
|
|
||||||
return dynamoDBClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
AmazonCloudWatch getCloudWatchClient() {
|
|
||||||
return cloudWatchClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
|
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
|
||||||
* IRecordProcessor}.
|
* IRecordProcessor}.
|
||||||
|
|
@ -1228,39 +1201,44 @@ public class Worker implements Runnable {
|
||||||
execService = getExecutorService();
|
execService = getExecutorService();
|
||||||
}
|
}
|
||||||
if (kinesisClient == null) {
|
if (kinesisClient == null) {
|
||||||
kinesisClient = (AmazonKinesis) createClient(AmazonKinesisClientBuilder.standard(),
|
kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
|
||||||
config.getKinesisCredentialsProvider(),
|
config.getKinesisClientConfiguration());
|
||||||
config.getKinesisClientConfiguration(),
|
|
||||||
config.getKinesisEndpoint(),
|
|
||||||
config.getRegionName());
|
|
||||||
}
|
}
|
||||||
if (dynamoDBClient == null) {
|
if (dynamoDBClient == null) {
|
||||||
dynamoDBClient = (AmazonDynamoDB) createClient(AmazonDynamoDBClientBuilder.standard(),
|
dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
|
||||||
config.getDynamoDBCredentialsProvider(),
|
config.getDynamoDBClientConfiguration());
|
||||||
config.getDynamoDBClientConfiguration(),
|
|
||||||
config.getDynamoDBEndpoint(),
|
|
||||||
config.getRegionName());
|
|
||||||
}
|
}
|
||||||
if (cloudWatchClient == null) {
|
if (cloudWatchClient == null) {
|
||||||
cloudWatchClient = (AmazonCloudWatch) createClient(AmazonCloudWatchClientBuilder.standard(),
|
cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
|
||||||
config.getCloudWatchCredentialsProvider(),
|
config.getCloudWatchClientConfiguration());
|
||||||
config.getCloudWatchClientConfiguration(),
|
|
||||||
null,
|
|
||||||
config.getRegionName());
|
|
||||||
}
|
}
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient((AmazonWebServiceClient) cloudWatchClient, config.getRegionName());
|
Region region = RegionUtils.getRegion(config.getRegionName());
|
||||||
setRegionForClient((AmazonWebServiceClient) kinesisClient, config.getRegionName());
|
cloudWatchClient.setRegion(region);
|
||||||
setRegionForClient((AmazonWebServiceClient) dynamoDBClient, config.getRegionName());
|
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
|
||||||
|
kinesisClient.setRegion(region);
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
|
||||||
|
dynamoDBClient.setRegion(region);
|
||||||
|
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
|
||||||
}
|
}
|
||||||
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
||||||
if (config.getDynamoDBEndpoint() != null) {
|
if (config.getDynamoDBEndpoint() != null) {
|
||||||
setEndpointForClient((AmazonWebServiceClient) dynamoDBClient, config.getDynamoDBEndpoint());
|
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
|
||||||
|
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
|
||||||
}
|
}
|
||||||
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
||||||
if (config.getKinesisEndpoint() != null) {
|
if (config.getKinesisEndpoint() != null) {
|
||||||
setEndpointForClient((AmazonWebServiceClient) kinesisClient, config.getKinesisEndpoint());
|
kinesisClient.setEndpoint(config.getKinesisEndpoint());
|
||||||
|
if (config.getRegionName() != null) {
|
||||||
|
LOG.warn("Received configuration for both region name as " + config.getRegionName()
|
||||||
|
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
|
||||||
|
+ ". Amazon Kinesis endpoint will overwrite region name.");
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been overwritten to "
|
||||||
|
+ config.getKinesisEndpoint());
|
||||||
|
} else {
|
||||||
|
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (metricsFactory == null) {
|
if (metricsFactory == null) {
|
||||||
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
||||||
|
|
@ -1314,27 +1292,5 @@ public class Worker implements Runnable {
|
||||||
workerStateChangeListener);
|
workerStateChangeListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder,
|
|
||||||
final AWSCredentialsProvider credentialsProvider,
|
|
||||||
final ClientConfiguration clientConfiguration,
|
|
||||||
final String endpointUrl,
|
|
||||||
final String region) {
|
|
||||||
if (credentialsProvider != null) {
|
|
||||||
builder.withCredentials(credentialsProvider);
|
|
||||||
}
|
|
||||||
if (clientConfiguration != null) {
|
|
||||||
builder.withClientConfiguration(clientConfiguration);
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotEmpty(endpointUrl)) {
|
|
||||||
LOG.warn("Received configuration for both region name as " + region + ", and endpoint as "
|
|
||||||
+ endpointUrl + ". Client endpoint will overwrite region name.");
|
|
||||||
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region));
|
|
||||||
} else if (StringUtils.isNotEmpty(region)) {
|
|
||||||
LOG.debug("The region for the client has been set to " + region);
|
|
||||||
builder.withRegion(region);
|
|
||||||
}
|
|
||||||
return (AmazonWebServiceClient) builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -268,8 +268,8 @@ public class KinesisClientLibConfigurationTest {
|
||||||
IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class);
|
IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class);
|
||||||
new Worker(processorFactory, kclConfig);
|
new Worker(processorFactory, kclConfig);
|
||||||
|
|
||||||
Mockito.verify(kclConfig, Mockito.times(5)).getRegionName();
|
Mockito.verify(kclConfig, Mockito.times(9)).getRegionName();
|
||||||
Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
|
Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint();
|
||||||
|
|
||||||
kclConfig = Mockito.spy(
|
kclConfig = Mockito.spy(
|
||||||
new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0")
|
new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0")
|
||||||
|
|
@ -277,8 +277,8 @@ public class KinesisClientLibConfigurationTest {
|
||||||
|
|
||||||
new Worker(processorFactory, kclConfig);
|
new Worker(processorFactory, kclConfig);
|
||||||
|
|
||||||
Mockito.verify(kclConfig, Mockito.times(2)).getRegionName();
|
Mockito.verify(kclConfig, Mockito.times(3)).getRegionName();
|
||||||
Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint();
|
Mockito.verify(kclConfig, Mockito.times(3)).getKinesisEndpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.isA;
|
import static org.hamcrest.CoreMatchers.isA;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
|
@ -67,8 +64,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import com.amazonaws.ClientConfiguration;
|
|
||||||
import com.amazonaws.client.builder.AwsClientBuilder;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.hamcrest.Condition;
|
import org.hamcrest.Condition;
|
||||||
|
|
@ -86,14 +81,8 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.amazonaws.regions.RegionUtils;
|
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
|
||||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
|
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
|
||||||
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
|
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
|
||||||
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
|
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||||
|
|
@ -103,8 +92,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMe
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
|
||||||
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
|
||||||
|
|
@ -1641,115 +1630,6 @@ public class WorkerTest {
|
||||||
Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager());
|
Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuilderSetRegionAndEndpointToClient() {
|
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
|
||||||
final String endpoint = "TestEndpoint";
|
|
||||||
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
|
|
||||||
.withRegionName("us-west-2")
|
|
||||||
.withKinesisEndpoint(endpoint)
|
|
||||||
.withDynamoDBEndpoint(endpoint);
|
|
||||||
|
|
||||||
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.defaultClient());
|
|
||||||
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.defaultClient());
|
|
||||||
AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.defaultClient());
|
|
||||||
|
|
||||||
new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)
|
|
||||||
.kinesisClient(kinesisClient)
|
|
||||||
.dynamoDBClient(dynamoDBClient)
|
|
||||||
.cloudWatchClient(cloudWatchClient)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
|
|
||||||
verify(kinesisClient, times(1)).setEndpoint(eq(endpoint));
|
|
||||||
verify(dynamoDBClient, times(1)).setEndpoint(eq(endpoint));
|
|
||||||
verify(cloudWatchClient, never()).setEndpoint(anyString());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuilderSetRegionToClient() {
|
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
|
||||||
String region = "us-west-2";
|
|
||||||
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
|
|
||||||
.withRegionName(region);
|
|
||||||
|
|
||||||
Worker.Builder builder = new Worker.Builder();
|
|
||||||
|
|
||||||
AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.defaultClient());
|
|
||||||
AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.defaultClient());
|
|
||||||
AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.defaultClient());
|
|
||||||
|
|
||||||
builder.recordProcessorFactory(recordProcessorFactory).config(config)
|
|
||||||
.kinesisClient(kinesisClient)
|
|
||||||
.dynamoDBClient(dynamoDBClient)
|
|
||||||
.cloudWatchClient(cloudWatchClient)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName())));
|
|
||||||
|
|
||||||
verify(kinesisClient, never()).setEndpoint(any());
|
|
||||||
verify(dynamoDBClient, never()).setEndpoint(any());
|
|
||||||
verify(cloudWatchClient, never()).setEndpoint(any());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuilderGenerateClients() {
|
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
|
||||||
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null);
|
|
||||||
Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config));
|
|
||||||
|
|
||||||
assertNull(builder.getKinesisClient());
|
|
||||||
assertNull(builder.getDynamoDBClient());
|
|
||||||
assertNull(builder.getCloudWatchClient());
|
|
||||||
|
|
||||||
builder.build();
|
|
||||||
|
|
||||||
assertTrue(builder.getKinesisClient() instanceof AmazonKinesis);
|
|
||||||
assertTrue(builder.getDynamoDBClient() instanceof AmazonDynamoDB);
|
|
||||||
assertTrue(builder.getCloudWatchClient() instanceof AmazonCloudWatch);
|
|
||||||
|
|
||||||
verify(builder, times(3)).createClient(
|
|
||||||
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(null));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuilderGenerateClientsWithRegion() {
|
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
|
||||||
String region = "us-west-2";
|
|
||||||
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
|
|
||||||
.withRegionName(region);
|
|
||||||
|
|
||||||
Worker.Builder builder = spy(new Worker.Builder());
|
|
||||||
|
|
||||||
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
|
|
||||||
|
|
||||||
verify(builder, times(3)).createClient(
|
|
||||||
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuilderGenerateClientsWithEndpoint() {
|
|
||||||
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
|
|
||||||
String region = "us-west-2";
|
|
||||||
String endpointUrl = "TestEndpoint";
|
|
||||||
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null)
|
|
||||||
.withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl);
|
|
||||||
|
|
||||||
Worker.Builder builder = spy(new Worker.Builder());
|
|
||||||
|
|
||||||
builder.recordProcessorFactory(recordProcessorFactory).config(config).build();
|
|
||||||
|
|
||||||
verify(builder, times(2)).createClient(
|
|
||||||
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region));
|
|
||||||
verify(builder, times(1)).createClient(
|
|
||||||
any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region));
|
|
||||||
}
|
|
||||||
|
|
||||||
private abstract class InjectableWorker extends Worker {
|
private abstract class InjectableWorker extends Worker {
|
||||||
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
|
||||||
KinesisClientLibConfiguration config, StreamConfig streamConfig,
|
KinesisClientLibConfiguration config, StreamConfig streamConfig,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue