diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 6f37b4d0..f61ce288 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,6 +17,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.kinesis.metrics.impl.LogMetricsFactory; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** * Configuration for the Amazon Kinesis Client Library. @@ -117,6 +120,7 @@ public class KinesisClientLibConfiguration { private int metricsMaxQueueSize; private boolean validateSequenceNumberBeforeCheckpointing; private String regionName; + private String customCloudWatchMetricsFactoryName; /** * Constructor. @@ -285,6 +289,16 @@ public class KinesisClientLibConfiguration { throw new IllegalArgumentException("The specified region name is not valid"); } } + + private void checkIsCloudWatchMetricsFactoryNameValid(String cloudWatchMetricsFactoryName) { + if ("CWMetricsFactory".equals(cloudWatchMetricsFactoryName) || + "NullMetricsFactory".equals(cloudWatchMetricsFactoryName) || + "LogMetricsFactory".equals(cloudWatchMetricsFactoryName) ){ + return; + } + + throw new IllegalArgumentException("The specified Cloud Watch Metrics Factory Name is not valid"); + } /** * @return Name of the application @@ -454,6 +468,31 @@ public class KinesisClientLibConfiguration { public String getRegionName() { return regionName; } + + /** + * @return Custom CloudWatch Metrics Factory Name. + */ + public String getCustomCloudWatchMetricsFactoryName() { + return customCloudWatchMetricsFactoryName; + } + + /** + * @return Custom CloudWatch Metrics Factory. In case of the default MetricsFactory CWMetricsFactory, null is returned. + */ + public IMetricsFactory getCustomCloudWatchMetricsFactory() { + IMetricsFactory customCloudWatchMetricsFactory = null; + if (customCloudWatchMetricsFactoryName != null && customCloudWatchMetricsFactoryName != "CWMetricsFactory"){ + switch (customCloudWatchMetricsFactoryName) { + case "NullMetricsFactory": + customCloudWatchMetricsFactory = new NullMetricsFactory(); + break; + case "LogMetricsFactory": + customCloudWatchMetricsFactory = new LogMetricsFactory(); + break; + } + } + return customCloudWatchMetricsFactory; + } // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** @@ -655,4 +694,17 @@ public class KinesisClientLibConfiguration { this.regionName = regionName; return this; } + + /** + * + * @param customCloudWatchMetricsFactoryName The region name for the service + * @return KinesisClientLibConfiguration + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 2 LINES + public KinesisClientLibConfiguration withCustomCloudWatchMetricsFactoryName(String customCloudWatchMetricsFactoryName) { + checkIsCloudWatchMetricsFactoryNameValid(customCloudWatchMetricsFactoryName); + this.customCloudWatchMetricsFactoryName = customCloudWatchMetricsFactoryName; + return this; + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 8b74cabc..f3202437 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** * Main app that launches the worker that runs the multi-language record processor. @@ -73,6 +74,21 @@ public class MultiLangDaemon implements Callable { ExecutorService workerThreadPool) { this(new Worker(recordProcessorFactory, configuration, workerThreadPool)); } + + /** + * Constructor with custom CloudWatch Metrics Factory. + * + * @param configuration The KCL config to use. + * @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang + * protocol. + * @param cloudWatchMetricsFactory A customized Cloudwatch Metric Factory implementation. + * @param workerThreadPool The executor service to run the daemon in. + */ + public MultiLangDaemon(KinesisClientLibConfiguration configuration, + MultiLangRecordProcessorFactory recordProcessorFactory, IMetricsFactory cloudWatchMetricsFactory, + ExecutorService workerThreadPool) { + this(new Worker(recordProcessorFactory, configuration, cloudWatchMetricsFactory, workerThreadPool)); + } /** * @@ -133,10 +149,22 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( - config.getKinesisClientLibConfiguration(), - config.getRecordProcessorFactory(), - executorService); + KinesisClientLibConfiguration kinesisClientLibConfiguration = config.getKinesisClientLibConfiguration(); + IMetricsFactory customCWMetricsFactory = kinesisClientLibConfiguration.getCustomCloudWatchMetricsFactory(); + + MultiLangDaemon daemon = null; + + if (customCWMetricsFactory == null){ + daemon = new MultiLangDaemon( + config.getKinesisClientLibConfiguration(), + config.getRecordProcessorFactory(), + executorService); + } else { + daemon = new MultiLangDaemon( + config.getKinesisClientLibConfiguration(), + config.getRecordProcessorFactory(), customCWMetricsFactory, + executorService); + } Future future = executorService.submit(daemon); try {