Making changes to the worker to handle immutable clients better.
This commit is contained in:
parent
2cc5ed4081
commit
948b746624
1 changed files with 24 additions and 55 deletions
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
|
@ -408,16 +409,16 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient(kinesisClient, config.getRegionName());
|
setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
setRegionForClient(dynamoDBClient, config.getRegionName());
|
setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
}
|
}
|
||||||
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
||||||
if (config.getDynamoDBEndpoint() != null) {
|
if (config.getDynamoDBEndpoint() != null) {
|
||||||
setEndpointForClient(dynamoDBClient, config.getDynamoDBEndpoint());
|
setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
|
||||||
}
|
}
|
||||||
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
||||||
if (config.getKinesisEndpoint() != null) {
|
if (config.getKinesisEndpoint() != null) {
|
||||||
setEndpointForClient(kinesisClient, config.getKinesisEndpoint());
|
setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1077,7 +1078,7 @@ public class Worker implements Runnable {
|
||||||
metricsFactory = new NullMetricsFactory();
|
metricsFactory = new NullMetricsFactory();
|
||||||
} else {
|
} else {
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient(cloudWatchClient, config.getRegionName());
|
setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
}
|
}
|
||||||
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
|
metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
|
||||||
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
|
config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
|
||||||
|
|
@ -1096,43 +1097,12 @@ public class Worker implements Runnable {
|
||||||
return new WorkerThreadPoolExecutor(threadFactory);
|
return new WorkerThreadPoolExecutor(threadFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setRegionForClient(@NonNull final Object client, @NonNull final String region) {
|
private static <S, T> void setField(final S source, final String field, final Consumer<T> t, T value) {
|
||||||
try {
|
try {
|
||||||
if (client instanceof AmazonKinesis) {
|
t.accept(value);
|
||||||
LOG.debug("Configuration passed for region as " + region + " for AmazonKinesis client.");
|
|
||||||
((AmazonKinesis) client).setRegion(RegionUtils.getRegion(region));
|
|
||||||
} else if (client instanceof AmazonDynamoDB) {
|
|
||||||
LOG.debug("Configuration passed for region as " + region + " for AmazonDynamoDB client.");
|
|
||||||
((AmazonDynamoDB) client).setRegion(RegionUtils.getRegion(region));
|
|
||||||
} else if (client instanceof AmazonCloudWatch) {
|
|
||||||
LOG.debug("Configuration passed for region as " + region + " for AmazonCloudWatch client.");
|
|
||||||
((AmazonCloudWatch) client).setRegion(RegionUtils.getRegion(region));
|
|
||||||
} else {
|
|
||||||
LOG.warn("Client passed is not an instance of AmazonKinesis, AmazonDynamoDB or AmazonCloudWatch. "
|
|
||||||
+ "Could not set the region to " + region + ".");
|
|
||||||
}
|
|
||||||
} catch (UnsupportedOperationException e) {
|
} catch (UnsupportedOperationException e) {
|
||||||
LOG.debug("Exception thrown, indicating that the client is immutable.", e);
|
LOG.debug("Exception thrown while trying to set " + field + ", indicating that "
|
||||||
}
|
+ source.getClass().getSimpleName() + "is immutable.", e);
|
||||||
}
|
|
||||||
|
|
||||||
private static void setEndpointForClient(@NonNull final Object client, @NonNull final String endpointUrl) {
|
|
||||||
try {
|
|
||||||
if (client instanceof AmazonKinesis) {
|
|
||||||
LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonKinesis client.");
|
|
||||||
((AmazonKinesis) client).setEndpoint(endpointUrl);
|
|
||||||
} else if (client instanceof AmazonDynamoDB) {
|
|
||||||
LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonDynamoDB client.");
|
|
||||||
((AmazonDynamoDB) client).setEndpoint(endpointUrl);
|
|
||||||
} else if (client instanceof AmazonCloudWatch) {
|
|
||||||
LOG.debug("Configuration passed for endpoint as " + endpointUrl + " for AmazonCloudWatch client.");
|
|
||||||
((AmazonCloudWatch) client).setEndpoint(endpointUrl);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Client passed is not an instance of AmazonKinesis, AmazonDynamoDB or AmazonCloudWatch. "
|
|
||||||
+ "Could not set the endpoint to " + endpointUrl + ".");
|
|
||||||
}
|
|
||||||
} catch (UnsupportedOperationException e) {
|
|
||||||
LOG.debug("Exception thrown, indicating that the client is immutable.", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1251,21 +1221,21 @@ public class Worker implements Runnable {
|
||||||
execService = getExecutorService();
|
execService = getExecutorService();
|
||||||
}
|
}
|
||||||
if (kinesisClient == null) {
|
if (kinesisClient == null) {
|
||||||
kinesisClient = (AmazonKinesis) createClient(AmazonKinesisClientBuilder.standard(),
|
kinesisClient = createClient(AmazonKinesisClientBuilder.standard(),
|
||||||
config.getKinesisCredentialsProvider(),
|
config.getKinesisCredentialsProvider(),
|
||||||
config.getKinesisClientConfiguration(),
|
config.getKinesisClientConfiguration(),
|
||||||
config.getKinesisEndpoint(),
|
config.getKinesisEndpoint(),
|
||||||
config.getRegionName());
|
config.getRegionName());
|
||||||
}
|
}
|
||||||
if (dynamoDBClient == null) {
|
if (dynamoDBClient == null) {
|
||||||
dynamoDBClient = (AmazonDynamoDB) createClient(AmazonDynamoDBClientBuilder.standard(),
|
dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(),
|
||||||
config.getDynamoDBCredentialsProvider(),
|
config.getDynamoDBCredentialsProvider(),
|
||||||
config.getDynamoDBClientConfiguration(),
|
config.getDynamoDBClientConfiguration(),
|
||||||
config.getDynamoDBEndpoint(),
|
config.getDynamoDBEndpoint(),
|
||||||
config.getRegionName());
|
config.getRegionName());
|
||||||
}
|
}
|
||||||
if (cloudWatchClient == null) {
|
if (cloudWatchClient == null) {
|
||||||
cloudWatchClient = (AmazonCloudWatch) createClient(AmazonCloudWatchClientBuilder.standard(),
|
cloudWatchClient = createClient(AmazonCloudWatchClientBuilder.standard(),
|
||||||
config.getCloudWatchCredentialsProvider(),
|
config.getCloudWatchCredentialsProvider(),
|
||||||
config.getCloudWatchClientConfiguration(),
|
config.getCloudWatchClientConfiguration(),
|
||||||
null,
|
null,
|
||||||
|
|
@ -1273,17 +1243,17 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
setRegionForClient(cloudWatchClient, config.getRegionName());
|
setField(cloudWatchClient, "region", cloudWatchClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
setRegionForClient(kinesisClient, config.getRegionName());
|
setField(kinesisClient, "region", kinesisClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
setRegionForClient(dynamoDBClient, config.getRegionName());
|
setField(dynamoDBClient, "region", dynamoDBClient::setRegion, RegionUtils.getRegion(config.getRegionName()));
|
||||||
}
|
}
|
||||||
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
|
||||||
if (config.getDynamoDBEndpoint() != null) {
|
if (config.getDynamoDBEndpoint() != null) {
|
||||||
setEndpointForClient(dynamoDBClient, config.getDynamoDBEndpoint());
|
setField(dynamoDBClient, "endpoint", dynamoDBClient::setEndpoint, config.getDynamoDBEndpoint());
|
||||||
}
|
}
|
||||||
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
|
||||||
if (config.getKinesisEndpoint() != null) {
|
if (config.getKinesisEndpoint() != null) {
|
||||||
setEndpointForClient(kinesisClient, config.getKinesisEndpoint());
|
setField(kinesisClient, "endpoint", kinesisClient::setEndpoint, config.getKinesisEndpoint());
|
||||||
}
|
}
|
||||||
if (metricsFactory == null) {
|
if (metricsFactory == null) {
|
||||||
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
metricsFactory = getMetricsFactory(cloudWatchClient, config);
|
||||||
|
|
@ -1337,7 +1307,7 @@ public class Worker implements Runnable {
|
||||||
workerStateChangeListener);
|
workerStateChangeListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
AmazonWebServiceClient createClient(@NonNull final AwsClientBuilder builder,
|
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
|
||||||
final AWSCredentialsProvider credentialsProvider,
|
final AWSCredentialsProvider credentialsProvider,
|
||||||
final ClientConfiguration clientConfiguration,
|
final ClientConfiguration clientConfiguration,
|
||||||
final String endpointUrl,
|
final String endpointUrl,
|
||||||
|
|
@ -1359,8 +1329,7 @@ public class Worker implements Runnable {
|
||||||
LOG.warn("No configuration received for endpoint and region, will default region to us-east-1");
|
LOG.warn("No configuration received for endpoint and region, will default region to us-east-1");
|
||||||
builder.withRegion(Regions.US_EAST_1);
|
builder.withRegion(Regions.US_EAST_1);
|
||||||
}
|
}
|
||||||
return (AmazonWebServiceClient) builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue