From 86cc068454be21e074e989cfaddec7886ff98066 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 4 May 2018 10:06:13 -0700 Subject: [PATCH] Fix for Worker to handle immutable clients (#305) --- pom.xml | 2 +- .../clientlibrary/lib/worker/Worker.java | 129 +++++++++++------ .../KinesisClientLibConfigurationTest.java | 8 +- .../clientlibrary/lib/worker/WorkerTest.java | 132 +++++++++++++++++- 4 files changed, 220 insertions(+), 51 deletions(-) diff --git a/pom.xml b/pom.xml index 2e4488be..2002cb43 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.9.1 + 1.9.2-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 4a03b449..e9e1b5ed 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,18 +31,27 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.regions.Region; +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.Regions; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; @@ -60,6 +69,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.NonNull; import lombok.Setter; import lombok.experimental.Accessors; @@ -399,28 +409,16 @@ public class Worker implements Runnable { // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - Region region = RegionUtils.getRegion(config.getRegionName()); - kinesisClient.setRegion(region); - LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName()); - dynamoDBClient.setRegion(region); - LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); + setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); - LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - kinesisClient.setEndpoint(config.getKinesisEndpoint()); - if (config.getRegionName() != null) { - LOG.warn("Received configuration for both region name as " + config.getRegionName() - + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() - + ". Amazon Kinesis endpoint will overwrite region name."); - LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { - LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); - } + setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint()); } } @@ -1080,9 +1078,7 @@ public class Worker implements Runnable { metricsFactory = new NullMetricsFactory(); } else { if (config.getRegionName() != null) { - Region region = RegionUtils.getRegion(config.getRegionName()); - cloudWatchClient.setRegion(region); - LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); + setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), @@ -1101,6 +1097,15 @@ public class Worker implements Runnable { return new WorkerThreadPoolExecutor(threadFactory); } + private static void setField(final S source, final String field, final Consumer t, T value) { + try { + t.accept(value); + } catch (UnsupportedOperationException e) { + LOG.debug("Exception thrown while trying to set " + field + ", indicating that " + + source.getClass().getSimpleName() + "is immutable.", e); + } + } + /** * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. @@ -1154,6 +1159,21 @@ public class Worker implements Runnable { @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; + @VisibleForTesting + AmazonKinesis getKinesisClient() { + return kinesisClient; + } + + @VisibleForTesting + AmazonDynamoDB getDynamoDBClient() { + return dynamoDBClient; + } + + @VisibleForTesting + AmazonCloudWatch getCloudWatchClient() { + return cloudWatchClient; + } + /** * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor * IRecordProcessor}. @@ -1201,44 +1221,39 @@ public class Worker implements Runnable { execService = getExecutorService(); } if (kinesisClient == null) { - kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()); + kinesisClient = createClient(AmazonKinesisClientBuilder.standard(), + config.getKinesisCredentialsProvider(), + config.getKinesisClientConfiguration(), + config.getKinesisEndpoint(), + config.getRegionName()); } if (dynamoDBClient == null) { - dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), - config.getDynamoDBClientConfiguration()); + dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(), + config.getDynamoDBCredentialsProvider(), + config.getDynamoDBClientConfiguration(), + config.getDynamoDBEndpoint(), + config.getRegionName()); } if (cloudWatchClient == null) { - cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), - config.getCloudWatchClientConfiguration()); + cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(), + config.getCloudWatchCredentialsProvider(), + config.getCloudWatchClientConfiguration(), + null, + config.getRegionName()); } // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - Region region = RegionUtils.getRegion(config.getRegionName()); - cloudWatchClient.setRegion(region); - LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); - kinesisClient.setRegion(region); - LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName()); - dynamoDBClient.setRegion(region); - LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); + setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); - LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - kinesisClient.setEndpoint(config.getKinesisEndpoint()); - if (config.getRegionName() != null) { - LOG.warn("Received configuration for both region name as " + config.getRegionName() - + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() - + ". Amazon Kinesis endpoint will overwrite region name."); - LOG.debug("The region of Amazon Kinesis client has been overwritten to " - + config.getKinesisEndpoint()); - } else { - LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); - } + setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint()); } if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); @@ -1292,5 +1307,29 @@ public class Worker implements Runnable { workerStateChangeListener); } + > R createClient(final T builder, + final AWSCredentialsProvider credentialsProvider, + final ClientConfiguration clientConfiguration, + final String endpointUrl, + final String region) { + if (credentialsProvider != null) { + builder.withCredentials(credentialsProvider); + } + if (clientConfiguration != null) { + builder.withClientConfiguration(clientConfiguration); + } + if (StringUtils.isNotEmpty(endpointUrl)) { + LOG.warn("Received configuration for endpoint as " + endpointUrl + ", and region as " + + region + "."); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region)); + } else if (StringUtils.isNotEmpty(region)) { + LOG.warn("Received configuration for region as " + region + "."); + builder.withRegion(region); + } else { + LOG.warn("No configuration received for endpoint and region, will default region to us-east-1"); + builder.withRegion(Regions.US_EAST_1); + } + return builder.build(); + } } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index cccbcb30..d30c9e57 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -268,8 +268,8 @@ public class KinesisClientLibConfigurationTest { IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class); new Worker(processorFactory, kclConfig); - Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); - Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); + Mockito.verify(kclConfig, Mockito.times(5)).getRegionName(); + Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint(); kclConfig = Mockito.spy( new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") @@ -277,8 +277,8 @@ public class KinesisClientLibConfigurationTest { new Worker(processorFactory, kclConfig); - Mockito.verify(kclConfig, Mockito.times(3)).getRegionName(); - Mockito.verify(kclConfig, Mockito.times(3)).getKinesisEndpoint(); + Mockito.verify(kclConfig, Mockito.times(2)).getRegionName(); + Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint(); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 21aaa8ac..ddc39aed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -75,14 +77,24 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; @@ -92,8 +104,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMe import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; @@ -1630,6 +1642,124 @@ public class WorkerTest { Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager()); } + @Test + public void testBuilderSetRegionAndEndpointToClient() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final String endpoint = "TestEndpoint"; + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + .withRegionName(Regions.US_WEST_2.getName()) + .withKinesisEndpoint(endpoint) + .withDynamoDBEndpoint(endpoint); + + AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + + new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(cloudWatchClient) + .build(); + + verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + + verify(kinesisClient, times(1)).setEndpoint(eq(endpoint)); + verify(dynamoDBClient, times(1)).setEndpoint(eq(endpoint)); + verify(cloudWatchClient, never()).setEndpoint(anyString()); + } + + @Test + public void testBuilderSetRegionToClient() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + String region = Regions.US_WEST_2.getName(); + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + .withRegionName(region); + + Worker.Builder builder = new Worker.Builder(); + + AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_WEST_2).build()); + + builder.recordProcessorFactory(recordProcessorFactory).config(config) + .kinesisClient(kinesisClient) + .dynamoDBClient(dynamoDBClient) + .cloudWatchClient(cloudWatchClient) + .build(); + + verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); + + verify(kinesisClient, never()).setEndpoint(any()); + verify(dynamoDBClient, never()).setEndpoint(any()); + verify(cloudWatchClient, never()).setEndpoint(any()); + } + + @Test + public void testBuilderGenerateClients() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null); + Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)); + ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); + + assertNull(builder.getKinesisClient()); + assertNull(builder.getDynamoDBClient()); + assertNull(builder.getCloudWatchClient()); + + builder.build(); + + assertTrue(builder.getKinesisClient() instanceof AmazonKinesis); + assertTrue(builder.getDynamoDBClient() instanceof AmazonDynamoDB); + assertTrue(builder.getCloudWatchClient() instanceof AmazonCloudWatch); + + verify(builder, times(3)).createClient( + builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(null)); + + builderCaptor.getAllValues().forEach(clientBuilder -> { + assertTrue(clientBuilder.getRegion().equals(Regions.US_EAST_1.getName())); + }); + } + + @Test + public void testBuilderGenerateClientsWithRegion() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + String region = Regions.US_WEST_2.getName(); + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + .withRegionName(region); + ArgumentCaptor builderCaptor = ArgumentCaptor.forClass(AwsClientBuilder.class); + + Worker.Builder builder = spy(new Worker.Builder()); + + builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); + + verify(builder, times(3)).createClient( + builderCaptor.capture(), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); + builderCaptor.getAllValues().forEach(clientBuilder -> { + assertTrue(clientBuilder.getRegion().equals(region)); + }); + } + + @Test + public void testBuilderGenerateClientsWithEndpoint() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + String region = Regions.US_WEST_2.getName(); + String endpointUrl = "TestEndpoint"; + KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) + .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); + + Worker.Builder builder = spy(new Worker.Builder()); + + builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); + + verify(builder, times(2)).createClient( + any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region)); + verify(builder, times(1)).createClient( + any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,