This commit is contained in:
André Pinto 2015-06-02 23:43:43 +00:00
commit 8e1bd9a0e4
2 changed files with 84 additions and 4 deletions

View file

@ -17,6 +17,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.metrics.impl.LogMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
/**
* Configuration for the Amazon Kinesis Client Library.
@ -117,6 +120,7 @@ public class KinesisClientLibConfiguration {
private int metricsMaxQueueSize;
private boolean validateSequenceNumberBeforeCheckpointing;
private String regionName;
private String customCloudWatchMetricsFactoryName;
/**
* Constructor.
@ -286,6 +290,16 @@ public class KinesisClientLibConfiguration {
}
}
private void checkIsCloudWatchMetricsFactoryNameValid(String cloudWatchMetricsFactoryName) {
if ("CWMetricsFactory".equals(cloudWatchMetricsFactoryName) ||
"NullMetricsFactory".equals(cloudWatchMetricsFactoryName) ||
"LogMetricsFactory".equals(cloudWatchMetricsFactoryName) ){
return;
}
throw new IllegalArgumentException("The specified Cloud Watch Metrics Factory Name is not valid");
}
/**
* @return Name of the application
*/
@ -455,6 +469,31 @@ public class KinesisClientLibConfiguration {
return regionName;
}
/**
* @return Custom CloudWatch Metrics Factory Name.
*/
public String getCustomCloudWatchMetricsFactoryName() {
return customCloudWatchMetricsFactoryName;
}
/**
* @return Custom CloudWatch Metrics Factory. In case of the default MetricsFactory CWMetricsFactory, null is returned.
*/
public IMetricsFactory getCustomCloudWatchMetricsFactory() {
IMetricsFactory customCloudWatchMetricsFactory = null;
if (customCloudWatchMetricsFactoryName != null && customCloudWatchMetricsFactoryName != "CWMetricsFactory"){
switch (customCloudWatchMetricsFactoryName) {
case "NullMetricsFactory":
customCloudWatchMetricsFactory = new NullMetricsFactory();
break;
case "LogMetricsFactory":
customCloudWatchMetricsFactory = new LogMetricsFactory();
break;
}
}
return customCloudWatchMetricsFactory;
}
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param kinesisEndpoint Kinesis endpoint
@ -655,4 +694,17 @@ public class KinesisClientLibConfiguration {
this.regionName = regionName;
return this;
}
/**
*
* @param customCloudWatchMetricsFactoryName The region name for the service
* @return KinesisClientLibConfiguration
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 2 LINES
public KinesisClientLibConfiguration withCustomCloudWatchMetricsFactoryName(String customCloudWatchMetricsFactoryName) {
checkIsCloudWatchMetricsFactoryNameValid(customCloudWatchMetricsFactoryName);
this.customCloudWatchMetricsFactoryName = customCloudWatchMetricsFactoryName;
return this;
}
}

View file

@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
/**
* Main app that launches the worker that runs the multi-language record processor.
@ -74,6 +75,21 @@ public class MultiLangDaemon implements Callable<Integer> {
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
}
/**
* Constructor with custom CloudWatch Metrics Factory.
*
* @param configuration The KCL config to use.
* @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
* protocol.
* @param cloudWatchMetricsFactory A customized Cloudwatch Metric Factory implementation.
* @param workerThreadPool The executor service to run the daemon in.
*/
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
MultiLangRecordProcessorFactory recordProcessorFactory, IMetricsFactory cloudWatchMetricsFactory,
ExecutorService workerThreadPool) {
this(new Worker(recordProcessorFactory, configuration, cloudWatchMetricsFactory, workerThreadPool));
}
/**
*
* @param worker A worker to use instead of the default worker.
@ -133,10 +149,22 @@ public class MultiLangDaemon implements Callable<Integer> {
ExecutorService executorService = config.getExecutorService();
// Daemon
MultiLangDaemon daemon = new MultiLangDaemon(
KinesisClientLibConfiguration kinesisClientLibConfiguration = config.getKinesisClientLibConfiguration();
IMetricsFactory customCWMetricsFactory = kinesisClientLibConfiguration.getCustomCloudWatchMetricsFactory();
MultiLangDaemon daemon = null;
if (customCWMetricsFactory == null){
daemon = new MultiLangDaemon(
config.getKinesisClientLibConfiguration(),
config.getRecordProcessorFactory(),
executorService);
} else {
daemon = new MultiLangDaemon(
config.getKinesisClientLibConfiguration(),
config.getRecordProcessorFactory(), customCWMetricsFactory,
executorService);
}
Future<Integer> future = executorService.submit(daemon);
try {