From d576d5932df138db5be0a2f667ec083cf36db1a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Pinto?= Date: Fri, 24 Apr 2015 15:16:29 -0300 Subject: [PATCH] MultiLangDaemon: Adding the possibility to customize the CloudWatch Metrics Factory by the config file --- .../worker/KinesisClientLibConfiguration.java | 22 ++++++++++++ .../kinesis/multilang/MultiLangDaemon.java | 36 ++++++++++++++++--- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 74a3f2cd..14b898ff 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -17,6 +17,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.kinesis.metrics.impl.LogMetricsFactory; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** * Configuration for the Amazon Kinesis Client Library. @@ -117,6 +120,7 @@ public class KinesisClientLibConfiguration { private int metricsMaxQueueSize; private boolean validateSequenceNumberBeforeCheckpointing; private String regionName; + private String customCloudWatchMetricsFactoryName; /** * Constructor. @@ -454,6 +458,24 @@ public class KinesisClientLibConfiguration { public String getRegionName() { return regionName; } + + /** + * @return Custom CloudWatch Metrics Factory. In case of the default MetricsFactory CWMetricsFactory, null is returned. + */ + public IMetricsFactory getCustomCloudWatchMetricsFactory() { + IMetricsFactory customCloudWatchMetricsFactory = null; + if (customCloudWatchMetricsFactoryName != null && customCloudWatchMetricsFactoryName != "CWMetricsFactory"){ + switch (customCloudWatchMetricsFactoryName) { + case "NullMetricsFactory": + customCloudWatchMetricsFactory = new NullMetricsFactory(); + break; + case "LogMetricsFactory": + customCloudWatchMetricsFactory = new LogMetricsFactory(); + break; + } + } + return customCloudWatchMetricsFactory; + } // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES /** diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java index 8b74cabc..f3202437 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** * Main app that launches the worker that runs the multi-language record processor. @@ -73,6 +74,21 @@ public class MultiLangDaemon implements Callable { ExecutorService workerThreadPool) { this(new Worker(recordProcessorFactory, configuration, workerThreadPool)); } + + /** + * Constructor with custom CloudWatch Metrics Factory. + * + * @param configuration The KCL config to use. + * @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang + * protocol. + * @param cloudWatchMetricsFactory A customized Cloudwatch Metric Factory implementation. + * @param workerThreadPool The executor service to run the daemon in. + */ + public MultiLangDaemon(KinesisClientLibConfiguration configuration, + MultiLangRecordProcessorFactory recordProcessorFactory, IMetricsFactory cloudWatchMetricsFactory, + ExecutorService workerThreadPool) { + this(new Worker(recordProcessorFactory, configuration, cloudWatchMetricsFactory, workerThreadPool)); + } /** * @@ -133,10 +149,22 @@ public class MultiLangDaemon implements Callable { ExecutorService executorService = config.getExecutorService(); // Daemon - MultiLangDaemon daemon = new MultiLangDaemon( - config.getKinesisClientLibConfiguration(), - config.getRecordProcessorFactory(), - executorService); + KinesisClientLibConfiguration kinesisClientLibConfiguration = config.getKinesisClientLibConfiguration(); + IMetricsFactory customCWMetricsFactory = kinesisClientLibConfiguration.getCustomCloudWatchMetricsFactory(); + + MultiLangDaemon daemon = null; + + if (customCWMetricsFactory == null){ + daemon = new MultiLangDaemon( + config.getKinesisClientLibConfiguration(), + config.getRecordProcessorFactory(), + executorService); + } else { + daemon = new MultiLangDaemon( + config.getKinesisClientLibConfiguration(), + config.getRecordProcessorFactory(), customCWMetricsFactory, + executorService); + } Future future = executorService.submit(daemon); try {