From a99d078bceb4e92be5f2ba43448af094c2583d49 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 2 Mar 2018 11:33:38 -0800 Subject: [PATCH] Revert "Adding capability to support Immutable clients by the Worker and Worker.Builder" This reverts commit 053bc226d0b9c9c7f0a1d73464f41d77073ef8d4. --- .../clientlibrary/lib/worker/Worker.java | 134 ++++++------------ .../KinesisClientLibConfigurationTest.java | 8 +- .../clientlibrary/lib/worker/WorkerTest.java | 122 +--------------- 3 files changed, 50 insertions(+), 214 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 4c964594..4a03b449 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,24 +32,17 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.AmazonWebServiceClient; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; @@ -67,7 +60,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.NonNull; import lombok.Setter; import lombok.experimental.Accessors; @@ -407,16 +399,28 @@ public class Worker implements Runnable { // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - setRegionForClient((AmazonWebServiceClient) kinesisClient, config.getRegionName()); - setRegionForClient((AmazonWebServiceClient) dynamoDBClient, config.getRegionName()); + Region region = RegionUtils.getRegion(config.getRegionName()); + kinesisClient.setRegion(region); + LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName()); + dynamoDBClient.setRegion(region); + LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - setEndpointForClient((AmazonWebServiceClient) dynamoDBClient, config.getDynamoDBEndpoint()); + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - setEndpointForClient((AmazonWebServiceClient) kinesisClient, config.getKinesisEndpoint()); + kinesisClient.setEndpoint(config.getKinesisEndpoint()); + if (config.getRegionName() != null) { + LOG.warn("Received configuration for both region name as " + config.getRegionName() + + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + + ". Amazon Kinesis endpoint will overwrite region name."); + LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); + } else { + LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); + } } } @@ -1076,7 +1080,9 @@ public class Worker implements Runnable { metricsFactory = new NullMetricsFactory(); } else { if (config.getRegionName() != null) { - setRegionForClient((AmazonWebServiceClient) cloudWatchClient, config.getRegionName()); + Region region = RegionUtils.getRegion(config.getRegionName()); + cloudWatchClient.setRegion(region); + LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); } metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), @@ -1095,24 +1101,6 @@ public class Worker implements Runnable { return new WorkerThreadPoolExecutor(threadFactory); } - private static void setRegionForClient(@NonNull final AmazonWebServiceClient client, @NonNull final String region) { - try { - client.setRegion(RegionUtils.getRegion(region)); - LOG.debug("Region set for client to " + region); - } catch (UnsupportedOperationException e) { - LOG.debug("Trying to set region for immutable client", e); - } - } - - private static void setEndpointForClient(@NonNull final AmazonWebServiceClient client, @NonNull final String endpointUrl) { - try { - client.setEndpoint(endpointUrl); - LOG.debug("Endpoint set for client to " + endpointUrl); - } catch (UnsupportedOperationException e) { - LOG.debug("Trying to set endpoint for immutable client", e); - } - } - /** * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. @@ -1166,21 +1154,6 @@ public class Worker implements Runnable { @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; - @VisibleForTesting - AmazonKinesis getKinesisClient() { - return kinesisClient; - } - - @VisibleForTesting - AmazonDynamoDB getDynamoDBClient() { - return dynamoDBClient; - } - - @VisibleForTesting - AmazonCloudWatch getCloudWatchClient() { - return cloudWatchClient; - } - /** * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor * IRecordProcessor}. @@ -1228,39 +1201,44 @@ public class Worker implements Runnable { execService = getExecutorService(); } if (kinesisClient == null) { - kinesisClient = (AmazonKinesis) createClient(AmazonKinesisClientBuilder.standard(), - config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration(), - config.getKinesisEndpoint(), - config.getRegionName()); + kinesisClient = new AmazonKinesisClient(config.getKinesisCredentialsProvider(), + config.getKinesisClientConfiguration()); } if (dynamoDBClient == null) { - dynamoDBClient = (AmazonDynamoDB) createClient(AmazonDynamoDBClientBuilder.standard(), - config.getDynamoDBCredentialsProvider(), - config.getDynamoDBClientConfiguration(), - config.getDynamoDBEndpoint(), - config.getRegionName()); + dynamoDBClient = new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), + config.getDynamoDBClientConfiguration()); } if (cloudWatchClient == null) { - cloudWatchClient = (AmazonCloudWatch) createClient(AmazonCloudWatchClientBuilder.standard(), - config.getCloudWatchCredentialsProvider(), - config.getCloudWatchClientConfiguration(), - null, - config.getRegionName()); + cloudWatchClient = new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), + config.getCloudWatchClientConfiguration()); } // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - setRegionForClient((AmazonWebServiceClient) cloudWatchClient, config.getRegionName()); - setRegionForClient((AmazonWebServiceClient) kinesisClient, config.getRegionName()); - setRegionForClient((AmazonWebServiceClient) dynamoDBClient, config.getRegionName()); + Region region = RegionUtils.getRegion(config.getRegionName()); + cloudWatchClient.setRegion(region); + LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); + kinesisClient.setRegion(region); + LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName()); + dynamoDBClient.setRegion(region); + LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - setEndpointForClient((AmazonWebServiceClient) dynamoDBClient, config.getDynamoDBEndpoint()); + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - setEndpointForClient((AmazonWebServiceClient) kinesisClient, config.getKinesisEndpoint()); + kinesisClient.setEndpoint(config.getKinesisEndpoint()); + if (config.getRegionName() != null) { + LOG.warn("Received configuration for both region name as " + config.getRegionName() + + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + + ". Amazon Kinesis endpoint will overwrite region name."); + LOG.debug("The region of Amazon Kinesis client has been overwritten to " + + config.getKinesisEndpoint()); + } else { + LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); + } } if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); @@ -1314,27 +1292,5 @@ public class Worker implements Runnable { workerStateChangeListener); } - AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder, - final AWSCredentialsProvider credentialsProvider, - final ClientConfiguration clientConfiguration, - final String endpointUrl, - final String region) { - if (credentialsProvider != null) { - builder.withCredentials(credentialsProvider); - } - if (clientConfiguration != null) { - builder.withClientConfiguration(clientConfiguration); - } - if (StringUtils.isNotEmpty(endpointUrl)) { - LOG.warn("Received configuration for both region name as " + region + ", and endpoint as " - + endpointUrl + ". Client endpoint will overwrite region name."); - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, region)); - } else if (StringUtils.isNotEmpty(region)) { - LOG.debug("The region for the client has been set to " + region); - builder.withRegion(region); - } - return (AmazonWebServiceClient) builder.build(); - } - } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index d30c9e57..cccbcb30 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -268,8 +268,8 @@ public class KinesisClientLibConfigurationTest { IRecordProcessorFactory processorFactory = Mockito.mock(IRecordProcessorFactory.class); new Worker(processorFactory, kclConfig); - Mockito.verify(kclConfig, Mockito.times(5)).getRegionName(); - Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint(); + Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); + Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); kclConfig = Mockito.spy( new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") @@ -277,8 +277,8 @@ public class KinesisClientLibConfigurationTest { new Worker(processorFactory, kclConfig); - Mockito.verify(kclConfig, Mockito.times(2)).getRegionName(); - Mockito.verify(kclConfig, Mockito.times(2)).getKinesisEndpoint(); + Mockito.verify(kclConfig, Mockito.times(3)).getRegionName(); + Mockito.verify(kclConfig, Mockito.times(3)).getKinesisEndpoint(); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0205b03a..21aaa8ac 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -19,9 +19,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -67,8 +64,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.client.builder.AwsClientBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Condition; @@ -86,14 +81,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.services.cloudwatch.AmazonCloudWatch; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; @@ -103,8 +92,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMe import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; @@ -1641,115 +1630,6 @@ public class WorkerTest { Assert.assertSame(leaseManager, worker.getLeaseCoordinator().getLeaseManager()); } - @Test - public void testBuilderSetRegionAndEndpointToClient() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final String endpoint = "TestEndpoint"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) - .withRegionName("us-west-2") - .withKinesisEndpoint(endpoint) - .withDynamoDBEndpoint(endpoint); - - AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.defaultClient()); - AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.defaultClient()); - AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.defaultClient()); - - new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config) - .kinesisClient(kinesisClient) - .dynamoDBClient(dynamoDBClient) - .cloudWatchClient(cloudWatchClient) - .build(); - - verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - - verify(kinesisClient, times(1)).setEndpoint(eq(endpoint)); - verify(dynamoDBClient, times(1)).setEndpoint(eq(endpoint)); - verify(cloudWatchClient, never()).setEndpoint(anyString()); - } - - @Test - public void testBuilderSetRegionToClient() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - String region = "us-west-2"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) - .withRegionName(region); - - Worker.Builder builder = new Worker.Builder(); - - AmazonKinesis kinesisClient = spy(AmazonKinesisClientBuilder.defaultClient()); - AmazonDynamoDB dynamoDBClient = spy(AmazonDynamoDBClientBuilder.defaultClient()); - AmazonCloudWatch cloudWatchClient = spy(AmazonCloudWatchClientBuilder.defaultClient()); - - builder.recordProcessorFactory(recordProcessorFactory).config(config) - .kinesisClient(kinesisClient) - .dynamoDBClient(dynamoDBClient) - .cloudWatchClient(cloudWatchClient) - .build(); - - verify(kinesisClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - verify(dynamoDBClient, times(1)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - verify(cloudWatchClient, times(2)).setRegion(eq(RegionUtils.getRegion(config.getRegionName()))); - - verify(kinesisClient, never()).setEndpoint(any()); - verify(dynamoDBClient, never()).setEndpoint(any()); - verify(cloudWatchClient, never()).setEndpoint(any()); - } - - @Test - public void testBuilderGenerateClients() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null); - Worker.Builder builder = spy(new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(config)); - - assertNull(builder.getKinesisClient()); - assertNull(builder.getDynamoDBClient()); - assertNull(builder.getCloudWatchClient()); - - builder.build(); - - assertTrue(builder.getKinesisClient() instanceof AmazonKinesis); - assertTrue(builder.getDynamoDBClient() instanceof AmazonDynamoDB); - assertTrue(builder.getCloudWatchClient() instanceof AmazonCloudWatch); - - verify(builder, times(3)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(null)); - } - - @Test - public void testBuilderGenerateClientsWithRegion() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - String region = "us-west-2"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) - .withRegionName(region); - - Worker.Builder builder = spy(new Worker.Builder()); - - builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); - - verify(builder, times(3)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); - } - - @Test - public void testBuilderGenerateClientsWithEndpoint() { - IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - String region = "us-west-2"; - String endpointUrl = "TestEndpoint"; - KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("TestApp", null, null, null) - .withRegionName(region).withKinesisEndpoint(endpointUrl).withDynamoDBEndpoint(endpointUrl); - - Worker.Builder builder = spy(new Worker.Builder()); - - builder.recordProcessorFactory(recordProcessorFactory).config(config).build(); - - verify(builder, times(2)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(endpointUrl), eq(region)); - verify(builder, times(1)).createClient( - any(AwsClientBuilder.class), eq(null), any(ClientConfiguration.class), eq(null), eq(region)); - } - private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,