MultiLangDaemon: Adding the possibility to customize the CloudWatch Metrics Factory by the config file
This commit is contained in:
parent
0fc90ff787
commit
d576d5932d
2 changed files with 54 additions and 4 deletions
|
|
@ -17,6 +17,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
import com.amazonaws.ClientConfiguration;
|
import com.amazonaws.ClientConfiguration;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.regions.RegionUtils;
|
import com.amazonaws.regions.RegionUtils;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.impl.LogMetricsFactory;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration for the Amazon Kinesis Client Library.
|
* Configuration for the Amazon Kinesis Client Library.
|
||||||
|
|
@ -117,6 +120,7 @@ public class KinesisClientLibConfiguration {
|
||||||
private int metricsMaxQueueSize;
|
private int metricsMaxQueueSize;
|
||||||
private boolean validateSequenceNumberBeforeCheckpointing;
|
private boolean validateSequenceNumberBeforeCheckpointing;
|
||||||
private String regionName;
|
private String regionName;
|
||||||
|
private String customCloudWatchMetricsFactoryName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
|
@ -455,6 +459,24 @@ public class KinesisClientLibConfiguration {
|
||||||
return regionName;
|
return regionName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Custom CloudWatch Metrics Factory. In case of the default MetricsFactory CWMetricsFactory, null is returned.
|
||||||
|
*/
|
||||||
|
public IMetricsFactory getCustomCloudWatchMetricsFactory() {
|
||||||
|
IMetricsFactory customCloudWatchMetricsFactory = null;
|
||||||
|
if (customCloudWatchMetricsFactoryName != null && customCloudWatchMetricsFactoryName != "CWMetricsFactory"){
|
||||||
|
switch (customCloudWatchMetricsFactoryName) {
|
||||||
|
case "NullMetricsFactory":
|
||||||
|
customCloudWatchMetricsFactory = new NullMetricsFactory();
|
||||||
|
break;
|
||||||
|
case "LogMetricsFactory":
|
||||||
|
customCloudWatchMetricsFactory = new LogMetricsFactory();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return customCloudWatchMetricsFactory;
|
||||||
|
}
|
||||||
|
|
||||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
|
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
|
||||||
/**
|
/**
|
||||||
* @param kinesisEndpoint Kinesis endpoint
|
* @param kinesisEndpoint Kinesis endpoint
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
||||||
|
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main app that launches the worker that runs the multi-language record processor.
|
* Main app that launches the worker that runs the multi-language record processor.
|
||||||
|
|
@ -74,6 +75,21 @@ public class MultiLangDaemon implements Callable<Integer> {
|
||||||
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
|
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor with custom CloudWatch Metrics Factory.
|
||||||
|
*
|
||||||
|
* @param configuration The KCL config to use.
|
||||||
|
* @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
|
||||||
|
* protocol.
|
||||||
|
* @param cloudWatchMetricsFactory A customized Cloudwatch Metric Factory implementation.
|
||||||
|
* @param workerThreadPool The executor service to run the daemon in.
|
||||||
|
*/
|
||||||
|
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
|
||||||
|
MultiLangRecordProcessorFactory recordProcessorFactory, IMetricsFactory cloudWatchMetricsFactory,
|
||||||
|
ExecutorService workerThreadPool) {
|
||||||
|
this(new Worker(recordProcessorFactory, configuration, cloudWatchMetricsFactory, workerThreadPool));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param worker A worker to use instead of the default worker.
|
* @param worker A worker to use instead of the default worker.
|
||||||
|
|
@ -133,10 +149,22 @@ public class MultiLangDaemon implements Callable<Integer> {
|
||||||
ExecutorService executorService = config.getExecutorService();
|
ExecutorService executorService = config.getExecutorService();
|
||||||
|
|
||||||
// Daemon
|
// Daemon
|
||||||
MultiLangDaemon daemon = new MultiLangDaemon(
|
KinesisClientLibConfiguration kinesisClientLibConfiguration = config.getKinesisClientLibConfiguration();
|
||||||
|
IMetricsFactory customCWMetricsFactory = kinesisClientLibConfiguration.getCustomCloudWatchMetricsFactory();
|
||||||
|
|
||||||
|
MultiLangDaemon daemon = null;
|
||||||
|
|
||||||
|
if (customCWMetricsFactory == null){
|
||||||
|
daemon = new MultiLangDaemon(
|
||||||
config.getKinesisClientLibConfiguration(),
|
config.getKinesisClientLibConfiguration(),
|
||||||
config.getRecordProcessorFactory(),
|
config.getRecordProcessorFactory(),
|
||||||
executorService);
|
executorService);
|
||||||
|
} else {
|
||||||
|
daemon = new MultiLangDaemon(
|
||||||
|
config.getKinesisClientLibConfiguration(),
|
||||||
|
config.getRecordProcessorFactory(), customCWMetricsFactory,
|
||||||
|
executorService);
|
||||||
|
}
|
||||||
|
|
||||||
Future<Integer> future = executorService.submit(daemon);
|
Future<Integer> future = executorService.submit(daemon);
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue