diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index c5d7ac2e..e9e1b5ed 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -408,16 +409,16 @@ public class Worker implements Runnable { // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - setRegionForClient(kinesisClient, config.getRegionName()); - setRegionForClient(dynamoDBClient, config.getRegionName()); + setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - setEndpointForClient(dynamoDBClient, config.getDynamoDBEndpoint()); + setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - setEndpointForClient(kinesisClient, config.getKinesisEndpoint()); + setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint()); } } @@ -1077,7 +1078,7 @@ public class Worker implements Runnable { metricsFactory = new NullMetricsFactory(); } else { if (config.getRegionName() != null) { - setRegionForClient(cloudWatchClient, config.getRegionName()); + setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), @@ -1096,43 +1097,12 @@ public class Worker implements Runnable { return new WorkerThreadPoolExecutor(threadFactory); } - private static void setRegionForClient(@NonNull final Object client, @NonNull final String region) { + private static void setField(final S source, final String field, final Consumer t, T value) { try { - if (client instanceof AmazonKinesis) { - LOG.debug("Configuration passed for region as " + region + " for AmazonKinesis client."); - ((AmazonKinesis) client).setRegion(RegionUtils.getRegion(region)); - } else if (client instanceof AmazonDynamoDB) { - LOG.debug("Configuration passed for region as " + region + " for AmazonDynamoDB client."); - ((AmazonDynamoDB) client).setRegion(RegionUtils.getRegion(region)); - } else if (client instanceof AmazonCloudWatch) { - LOG.debug("Configuration passed for region as " + region + " for AmazonCloudWatch client."); - ((AmazonCloudWatch) client).setRegion(RegionUtils.getRegion(region)); - } else { - LOG.warn("Client passed is not an instance of AmazonKinesis, AmazonDynamoDB or AmazonCloudWatch. " - + "Could not set the region to " + region + "."); - } + t.accept(value); } catch (UnsupportedOperationException e) { - LOG.debug("Exception thrown, indicating that the client is immutable.", e); - } - } - - private static void setEndpointForClient(@NonNull final Object client, @NonNull final String endpointUrl) { - try { - if (client instanceof AmazonKinesis) { - LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonKinesis client."); - ((AmazonKinesis) client).setEndpoint(endpointUrl); - } else if (client instanceof AmazonDynamoDB) { - LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonDynamoDB client."); - ((AmazonDynamoDB) client).setEndpoint(endpointUrl); - } else if (client instanceof AmazonCloudWatch) { - LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonCloudWatch client."); - ((AmazonCloudWatch) client).setEndpoint(endpointUrl); - } else { - LOG.warn("Client passed is not an instance of AmazonKinesis, AmazonDynamoDB or AmazonCloudWatch. " - + "Could not set the endpoint to " + endpointUrl + "."); - } - } catch (UnsupportedOperationException e) { - LOG.debug("Exception thrown, indicating that the client is immutable.", e); + LOG.debug("Exception thrown while trying to set " + field + ", indicating that " + + source.getClass().getSimpleName() + "is immutable.", e); } } @@ -1251,21 +1221,21 @@ public class Worker implements Runnable { execService = getExecutorService(); } if (kinesisClient == null) { - kinesisClient = (AmazonKinesis) createClient(AmazonKinesisClientBuilder.standard(), + kinesisClient = createClient(AmazonKinesisClientBuilder.standard(), config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration(), config.getKinesisEndpoint(), config.getRegionName()); } if (dynamoDBClient == null) { - dynamoDBClient = (AmazonDynamoDB) createClient(AmazonDynamoDBClientBuilder.standard(), + dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(), config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration(), config.getDynamoDBEndpoint(), config.getRegionName()); } if (cloudWatchClient == null) { - cloudWatchClient = (AmazonCloudWatch) createClient(AmazonCloudWatchClientBuilder.standard(), + cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(), config.getCloudWatchCredentialsProvider(), config.getCloudWatchClientConfiguration(), null, @@ -1273,17 +1243,17 @@ public class Worker implements Runnable { } // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { - setRegionForClient(cloudWatchClient, config.getRegionName()); - setRegionForClient(kinesisClient, config.getRegionName()); - setRegionForClient(dynamoDBClient, config.getRegionName()); + setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName())); + setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName())); } // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. if (config.getDynamoDBEndpoint() != null) { - setEndpointForClient(dynamoDBClient, config.getDynamoDBEndpoint()); + setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { - setEndpointForClient(kinesisClient, config.getKinesisEndpoint()); + setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint()); } if (metricsFactory == null) { metricsFactory = getMetricsFactory(cloudWatchClient, config); @@ -1337,11 +1307,11 @@ public class Worker implements Runnable { workerStateChangeListener); } - AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder, - final AWSCredentialsProvider credentialsProvider, - final ClientConfiguration clientConfiguration, - final String endpointUrl, - final String region) { + > R createClient(final T builder, + final AWSCredentialsProvider credentialsProvider, + final ClientConfiguration clientConfiguration, + final String endpointUrl, + final String region) { if (credentialsProvider != null) { builder.withCredentials(credentialsProvider); } @@ -1359,8 +1329,7 @@ public class Worker implements Runnable { LOG.warn("No configuration received for endpoint and region, will default region to us-east-1"); builder.withRegion(Regions.US_EAST_1); } - return (AmazonWebServiceClient) builder.build(); + return builder.build(); } - } }