'Version 1.2.1 of the Amazon Kinesis Client Library'
This commit is contained in:
parent
9b1549e810
commit
0fc90ff787
11 changed files with 292 additions and 164 deletions
|
|
@ -2,27 +2,28 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.2.0
|
||||
Bundle-Version: 1.2.1
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
|
||||
org.apache.commons.logging;bundle-version="1.1.1";visibility:=reexport,
|
||||
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.1.1",
|
||||
com.fasterxml.jackson.core.jackson-core;bundle-version="2.1.1",
|
||||
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1",
|
||||
org.apache.httpcomponents.httpcore;bundle-version="4.2.0",
|
||||
org.apache.httpcomponents.httpclient;bundle-version="4.2.0"
|
||||
com.amazonaws.sdk;bundle-version="1.7.13",
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
|
||||
org.apache.commons.logging;bundle-version="1.1.3";visibility:=reexport,
|
||||
com.fasterxml.jackson.core.jackson-databind;bundle-version="2.3.2",
|
||||
com.fasterxml.jackson.core.jackson-core;bundle-version="2.3.2",
|
||||
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.3.0",
|
||||
org.apache.httpcomponents.httpcore;bundle-version="4.3.2",
|
||||
org.apache.httpcomponents.httpclient;bundle-version="4.3.4"
|
||||
com.amazonaws.sdk;bundle-version="1.9.16",
|
||||
Export-Package: com.amazonaws.services.kinesis,
|
||||
com.amazonaws.services.kinesis.clientlibrary,
|
||||
com.amazonaws.services.kinesis.clientlibrary.config,
|
||||
com.amazonaws.services.kinesis.clientlibrary.exceptions,
|
||||
com.amazonaws.services.kinesis.clientlibrary.exceptions.internal,
|
||||
com.amazonaws.services.kinesis.clientlibrary.interfaces,
|
||||
com.amazonaws.services.kinesis.clientlibrary.types,
|
||||
com.amazonaws.services.kinesis.clientlibrary.proxies,
|
||||
com.amazonaws.services.kinesis.clientlibrary.lib,
|
||||
com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint,
|
||||
com.amazonaws.services.kinesis.clientlibrary.lib.worker,
|
||||
com.amazonaws.services.kinesis.clientlibrary.proxies,
|
||||
com.amazonaws.services.kinesis.clientlibrary.types,
|
||||
com.amazonaws.services.kinesis.leases,
|
||||
com.amazonaws.services.kinesis.leases.exceptions,
|
||||
com.amazonaws.services.kinesis.leases.impl,
|
||||
|
|
@ -30,4 +31,6 @@ Export-Package: com.amazonaws.services.kinesis,
|
|||
com.amazonaws.services.kinesis.leases.util,
|
||||
com.amazonaws.services.kinesis.metrics,
|
||||
com.amazonaws.services.kinesis.metrics.impl,
|
||||
com.amazonaws.services.kinesis.metrics.interfaces
|
||||
com.amazonaws.services.kinesis.metrics.interfaces,
|
||||
com.amazonaws.services.kinesis.multilang,
|
||||
com.amazonaws.services.kinesis.multilang.messages,
|
||||
|
|
|
|||
|
|
@ -26,6 +26,9 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
|
|||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.2.1 (January 26, 2015)
|
||||
* **MultiLangDaemon** Changes to the MultiLangDaemon to make it easier to provide a custom worker.
|
||||
|
||||
### Release 1.2 (October 21, 2014)
|
||||
* **Multi-Language Support** Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
|
||||
|
||||
|
|
|
|||
6
pom.xml
6
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.2.0</version>
|
||||
<version>1.2.1</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
@ -23,7 +23,7 @@
|
|||
</licenses>
|
||||
|
||||
<properties>
|
||||
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
|
||||
<aws-java-sdk.version>1.9.16</aws-java-sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -51,6 +51,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.2</version>
|
||||
<configuration>
|
||||
<source>1.7</source>
|
||||
<target>1.7</target>
|
||||
|
|
@ -64,6 +65,7 @@
|
|||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-gpg-plugin</artifactId>
|
||||
<version>1.5</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>sign-artifacts</id>
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.0";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.1";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -73,6 +73,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
@Override
|
||||
public synchronized void checkpoint()
|
||||
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", " + " token " + shardInfo.getConcurrencyToken()
|
||||
+ " at largest permitted value " + this.largestPermittedCheckpointValue);
|
||||
}
|
||||
advancePosition(this.largestPermittedCheckpointValue);
|
||||
}
|
||||
|
||||
|
|
@ -86,6 +90,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
|
||||
// throws exception if sequence number shouldn't be checkpointed for this shard
|
||||
sequenceNumberValidator.validateSequenceNumber(sequenceNumber);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Validated checkpoint sequence number " + sequenceNumber + " for " + shardInfo.getShardId()
|
||||
+ ", token " + shardInfo.getConcurrencyToken());
|
||||
}
|
||||
/*
|
||||
* If there isn't a last checkpoint value, we only care about checking the upper bound.
|
||||
* If there is a last checkpoint value, we want to check both the lower and upper bound.
|
||||
|
|
@ -93,6 +101,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
if ((checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
|
||||
&& checkpointValueComparator.compare(sequenceNumber, largestPermittedCheckpointValue) <= 0) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " at specific sequence number " + sequenceNumber);
|
||||
}
|
||||
this.advancePosition(sequenceNumber);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Could not checkpoint at sequence number " + sequenceNumber
|
||||
|
|
@ -162,15 +174,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
// Don't checkpoint a value we already successfully checkpointed
|
||||
if (sequenceNumber != null && !sequenceNumber.equals(lastCheckpointValue)) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
|
||||
+ " checkpoint to " + checkpointValue);
|
||||
}
|
||||
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointValue, shardInfo.getConcurrencyToken());
|
||||
lastCheckpointValue = checkpointValue;
|
||||
} catch (ThrottlingException e) {
|
||||
throw e;
|
||||
} catch (ShutdownException e) {
|
||||
throw e;
|
||||
} catch (InvalidStateException e) {
|
||||
throw e;
|
||||
} catch (KinesisClientLibDependencyException e) {
|
||||
} catch (ThrottlingException | ShutdownException | InvalidStateException
|
||||
| KinesisClientLibDependencyException e) {
|
||||
throw e;
|
||||
} catch (KinesisClientLibException e) {
|
||||
LOG.warn("Caught exception setting checkpoint.", e);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -74,7 +74,7 @@ public class Worker implements Runnable {
|
|||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
private final ShardSyncTaskManager controlServer;
|
||||
|
||||
private boolean shutdown;
|
||||
private volatile boolean shutdown;
|
||||
|
||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||
// info, value is ShardConsumer.
|
||||
|
|
|
|||
|
|
@ -227,18 +227,22 @@ public class LeaseCoordinator<T extends Lease> {
|
|||
* Stops background threads.
|
||||
*/
|
||||
public void stop() {
|
||||
threadpool.shutdown();
|
||||
try {
|
||||
if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
|
||||
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier()));
|
||||
} else {
|
||||
threadpool.shutdownNow();
|
||||
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
|
||||
if (threadpool != null) {
|
||||
threadpool.shutdown();
|
||||
try {
|
||||
if (threadpool.awaitTermination(STOP_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)) {
|
||||
LOG.info(String.format("Worker %s has successfully stopped lease-tracking threads", leaseTaker.getWorkerIdentifier()));
|
||||
} else {
|
||||
threadpool.shutdownNow();
|
||||
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
|
||||
leaseTaker.getWorkerIdentifier(),
|
||||
STOP_WAIT_TIME_MILLIS));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Encountered InterruptedException when awaiting threadpool termination");
|
||||
} else {
|
||||
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
|
||||
}
|
||||
|
||||
leaseRenewer.clearCurrentlyHeldLeases();
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -119,7 +119,7 @@ public class LeaseSerializer implements ILeaseSerializer<Lease> {
|
|||
if (lease.getLeaseOwner() == null) {
|
||||
eav = new ExpectedAttributeValue(false);
|
||||
} else {
|
||||
new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
|
||||
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(lease.getLeaseOwner()));
|
||||
}
|
||||
|
||||
result.put(LEASE_OWNER_KEY, eav);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -157,7 +157,7 @@ public class CWPublisherRunnable<KeyType> implements Runnable {
|
|||
/**
|
||||
* Overrideable for testing purposes.
|
||||
*/
|
||||
long getTime() {
|
||||
protected long getTime() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
|
|
@ -15,42 +15,38 @@
|
|||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
||||
|
||||
/**
|
||||
* Main app that launches the worker that runs the multi-language record processor.
|
||||
*
|
||||
*
|
||||
* Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
|
||||
* define these properties:
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
* # The script that abides by the multi-language protocol. This script will
|
||||
* # be executed by the MultiLangDaemon, which will communicate with this script
|
||||
* # over STDIN and STDOUT according to the multi-language protocol.
|
||||
* executableName = sampleapp.py
|
||||
*
|
||||
*
|
||||
* # The name of an Amazon Kinesis stream to process.
|
||||
* streamName = words
|
||||
*
|
||||
*
|
||||
* # Used by the KCL as the name of this application. Will be used as the name
|
||||
* # of a Amazon DynamoDB table which will store the lease and checkpoint
|
||||
* # information for workers with this application name.
|
||||
* applicationName = PythonKCLSample
|
||||
*
|
||||
*
|
||||
* # Users can change the credentials provider the KCL will use to retrieve credentials.
|
||||
* # The DefaultAWSCredentialsProviderChain checks several other providers, which is
|
||||
* # described here:
|
||||
|
|
@ -62,91 +58,47 @@ public class MultiLangDaemon implements Callable<Integer> {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangDaemon.class);
|
||||
|
||||
private static final String USER_AGENT = "amazon-kinesis-multi-lang-daemon";
|
||||
private static final String VERSION = "1.0.0";
|
||||
|
||||
private static final String PROP_EXECUTABLE_NAME = "executableName";
|
||||
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
|
||||
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
|
||||
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
private MultiLangRecordProcessorFactory recordProcessorFactory;
|
||||
|
||||
private ExecutorService workerThreadPool;
|
||||
|
||||
private String processingLanguage;
|
||||
private Worker worker;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param configuration The KCL config to use.
|
||||
* @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
|
||||
* protocol.
|
||||
* @param workerThreadPool The executor service to run the daemon in.
|
||||
*/
|
||||
MultiLangDaemon(String processingLanguage,
|
||||
KinesisClientLibConfiguration configuration,
|
||||
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
|
||||
MultiLangRecordProcessorFactory recordProcessorFactory,
|
||||
ExecutorService workerThreadPool) {
|
||||
this.processingLanguage = processingLanguage;
|
||||
this.configuration = configuration;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.workerThreadPool = workerThreadPool;
|
||||
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
|
||||
}
|
||||
|
||||
static void printUsage(PrintStream stream, String message) {
|
||||
/**
|
||||
*
|
||||
* @param worker A worker to use instead of the default worker.
|
||||
*/
|
||||
public MultiLangDaemon(Worker worker) {
|
||||
this.worker = worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility for describing how to run this app.
|
||||
*
|
||||
* @param stream Where to output the usage info.
|
||||
* @param messageToPrepend An optional error message to describe why the usage is being printed.
|
||||
*/
|
||||
public static void printUsage(PrintStream stream, String messageToPrepend) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
if (message != null) {
|
||||
builder.append(message);
|
||||
if (messageToPrepend != null) {
|
||||
builder.append(messageToPrepend);
|
||||
}
|
||||
builder.append(String.format("java %s <properties file>", MultiLangDaemon.class.getCanonicalName()));
|
||||
stream.println(builder.toString());
|
||||
}
|
||||
|
||||
static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
|
||||
Properties properties = new Properties();
|
||||
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
|
||||
properties.load(propertiesStream);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
static boolean validateProperties(Properties properties) {
|
||||
return properties != null && properties.getProperty(PROP_EXECUTABLE_NAME) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will cause the MultiLangDaemon to read its configuration and build a worker with a
|
||||
* MultiLangRecordProcessorFactory for the executable specified in the provided properties.
|
||||
*/
|
||||
void prepare() {
|
||||
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
||||
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
||||
|
||||
LOG.info("Using workerId: " + configuration.getWorkerIdentifier());
|
||||
LOG.info("Using credentials with access key id: "
|
||||
+ configuration.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId());
|
||||
|
||||
StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
|
||||
userAgent.append(" ");
|
||||
userAgent.append(USER_AGENT);
|
||||
userAgent.append("/");
|
||||
userAgent.append(VERSION);
|
||||
|
||||
if (processingLanguage != null) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(processingLanguage);
|
||||
}
|
||||
|
||||
if (recordProcessorFactory.getCommandArray().length > 0) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(recordProcessorFactory.getCommandArray()[0]);
|
||||
}
|
||||
|
||||
LOG.debug(String.format("User Agent string is: %s", userAgent.toString()));
|
||||
configuration.withUserAgent(userAgent.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
prepare();
|
||||
Worker worker = new Worker(recordProcessorFactory, configuration, workerThreadPool);
|
||||
int exitCode = 0;
|
||||
try {
|
||||
worker.run();
|
||||
|
|
@ -157,22 +109,6 @@ public class MultiLangDaemon implements Callable<Integer> {
|
|||
return exitCode;
|
||||
}
|
||||
|
||||
private static int getMaxActiveThreads(Properties properties) {
|
||||
return Integer.parseInt(properties.getProperty(PROP_MAX_ACTIVE_THREADS, "0"));
|
||||
}
|
||||
|
||||
private static ExecutorService getExecutorService(Properties properties) {
|
||||
int maxActiveThreads = getMaxActiveThreads(properties);
|
||||
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
|
||||
if (maxActiveThreads <= 0) {
|
||||
LOG.info("Using a cached thread pool.");
|
||||
return Executors.newCachedThreadPool();
|
||||
} else {
|
||||
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
|
||||
return Executors.newFixedThreadPool(maxActiveThreads);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
|
||||
* well as the name of an executable.
|
||||
|
|
@ -183,44 +119,30 @@ public class MultiLangDaemon implements Callable<Integer> {
|
|||
printUsage(System.err, "You must provide a properties file");
|
||||
System.exit(1);
|
||||
}
|
||||
Properties properties = null;
|
||||
MultiLangDaemonConfig config = null;
|
||||
try {
|
||||
properties = loadProperties(Thread.currentThread().getContextClassLoader(), args[0]);
|
||||
config = new MultiLangDaemonConfig(args[0]);
|
||||
} catch (IOException e) {
|
||||
printUsage(System.err, "You must provide a properties file");
|
||||
System.exit(1);
|
||||
} catch (IllegalArgumentException e) {
|
||||
printUsage(System.err, e.getMessage());
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if (validateProperties(properties)) {
|
||||
ExecutorService executorService = config.getExecutorService();
|
||||
|
||||
// Configuration
|
||||
KinesisClientLibConfiguration kinesisClientLibConfiguration =
|
||||
new KinesisClientLibConfigurator().getConfiguration(properties);
|
||||
String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
|
||||
// Daemon
|
||||
MultiLangDaemon daemon = new MultiLangDaemon(
|
||||
config.getKinesisClientLibConfiguration(),
|
||||
config.getRecordProcessorFactory(),
|
||||
executorService);
|
||||
|
||||
ExecutorService executorService = getExecutorService(properties);
|
||||
|
||||
// Factory
|
||||
MultiLangRecordProcessorFactory recordProcessorFactory =
|
||||
new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||
|
||||
// Daemon
|
||||
MultiLangDaemon daemon =
|
||||
new MultiLangDaemon(properties.getProperty(PROP_PROCESSING_LANGUAGE),
|
||||
kinesisClientLibConfiguration, recordProcessorFactory, executorService);
|
||||
|
||||
LOG.info("Running " + kinesisClientLibConfiguration.getApplicationName() + " to process stream "
|
||||
+ kinesisClientLibConfiguration.getStreamName() + " with executable " + executableName);
|
||||
|
||||
Future<Integer> future = executorService.submit(daemon);
|
||||
try {
|
||||
System.exit(future.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Encountered an error while running daemon", e);
|
||||
}
|
||||
} else {
|
||||
printUsage(System.err, "Must provide an executable name in the properties file, "
|
||||
+ "e.g. executableName = sampleapp.py");
|
||||
Future<Integer> future = executorService.submit(daemon);
|
||||
try {
|
||||
System.exit(future.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Encountered an error while running daemon", e);
|
||||
}
|
||||
System.exit(1);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
|
||||
/**
|
||||
* This class captures the configuration needed to run the MultiLangDaemon.
|
||||
*/
|
||||
public class MultiLangDaemonConfig {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangDaemonConfig.class);
|
||||
|
||||
private static final String USER_AGENT = "amazon-kinesis-multi-lang-daemon";
|
||||
private static final String VERSION = "1.0.1";
|
||||
|
||||
private static final String PROP_EXECUTABLE_NAME = "executableName";
|
||||
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
|
||||
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
|
||||
|
||||
private KinesisClientLibConfiguration kinesisClientLibConfig;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
private MultiLangRecordProcessorFactory recordProcessorFactory;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param propertiesFile The location of the properties file.
|
||||
* @throws IOException Thrown when the properties file can't be accessed.
|
||||
* @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
|
||||
*/
|
||||
public MultiLangDaemonConfig(String propertiesFile) throws IOException, IllegalArgumentException {
|
||||
this(propertiesFile, Thread.currentThread().getContextClassLoader());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param propertiesFile The location of the properties file.
|
||||
* @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
|
||||
* unit test.
|
||||
* @throws IOException Thrown when the properties file can't be accessed.
|
||||
* @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
|
||||
*/
|
||||
public MultiLangDaemonConfig(String propertiesFile, ClassLoader classLoader) throws IOException,
|
||||
IllegalArgumentException {
|
||||
this(propertiesFile, classLoader, new KinesisClientLibConfigurator());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param propertiesFile The location of the properties file.
|
||||
* @param classLoader A classloader, useful if trying to programmatically configure with the daemon, such as in a
|
||||
* unit test.
|
||||
* @param configurator A configurator to use.
|
||||
* @throws IOException Thrown when the properties file can't be accessed.
|
||||
* @throws IllegalArgumentException Thrown when the contents of the properties file are not as expected.
|
||||
*/
|
||||
public MultiLangDaemonConfig(String propertiesFile,
|
||||
ClassLoader classLoader,
|
||||
KinesisClientLibConfigurator configurator) throws IOException, IllegalArgumentException {
|
||||
Properties properties = loadProperties(classLoader, propertiesFile);
|
||||
if (!validateProperties(properties)) {
|
||||
throw new IllegalArgumentException("Must provide an executable name in the properties file, "
|
||||
+ "e.g. executableName = sampleapp.py");
|
||||
}
|
||||
|
||||
String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
|
||||
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
|
||||
|
||||
kinesisClientLibConfig = configurator.getConfiguration(properties);
|
||||
executorService = buildExecutorService(properties);
|
||||
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||
|
||||
LOG.info("Running " + kinesisClientLibConfig.getApplicationName() + " to process stream "
|
||||
+ kinesisClientLibConfig.getStreamName() + " with executable " + executableName);
|
||||
prepare(processingLanguage);
|
||||
}
|
||||
|
||||
private void prepare(String processingLanguage) {
|
||||
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
||||
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
||||
|
||||
LOG.info("Using workerId: " + kinesisClientLibConfig.getWorkerIdentifier());
|
||||
LOG.info("Using credentials with access key id: "
|
||||
+ kinesisClientLibConfig.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId());
|
||||
|
||||
StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
|
||||
userAgent.append(" ");
|
||||
userAgent.append(USER_AGENT);
|
||||
userAgent.append("/");
|
||||
userAgent.append(VERSION);
|
||||
|
||||
if (processingLanguage != null) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(processingLanguage);
|
||||
}
|
||||
|
||||
if (recordProcessorFactory.getCommandArray().length > 0) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(recordProcessorFactory.getCommandArray()[0]);
|
||||
}
|
||||
|
||||
LOG.info(String.format("MultiLangDaemon is adding the following fields to the User Agent: %s",
|
||||
userAgent.toString()));
|
||||
kinesisClientLibConfig.withUserAgent(userAgent.toString());
|
||||
}
|
||||
|
||||
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
|
||||
Properties properties = new Properties();
|
||||
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
|
||||
properties.load(propertiesStream);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean validateProperties(Properties properties) {
|
||||
return properties != null && properties.getProperty(PROP_EXECUTABLE_NAME) != null;
|
||||
}
|
||||
|
||||
private static int getMaxActiveThreads(Properties properties) {
|
||||
return Integer.parseInt(properties.getProperty(PROP_MAX_ACTIVE_THREADS, "0"));
|
||||
}
|
||||
|
||||
private static ExecutorService buildExecutorService(Properties properties) {
|
||||
int maxActiveThreads = getMaxActiveThreads(properties);
|
||||
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
|
||||
if (maxActiveThreads <= 0) {
|
||||
LOG.info("Using a cached thread pool.");
|
||||
return Executors.newCachedThreadPool();
|
||||
} else {
|
||||
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
|
||||
return Executors.newFixedThreadPool(maxActiveThreads);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return A KinesisClientLibConfiguration object based on the properties file provided.
|
||||
*/
|
||||
public KinesisClientLibConfiguration getKinesisClientLibConfiguration() {
|
||||
return kinesisClientLibConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return An executor service based on the properties file provided.
|
||||
*/
|
||||
public ExecutorService getExecutorService() {
|
||||
return executorService;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return A MultiLangRecordProcessorFactory based on the properties file provided.
|
||||
*/
|
||||
public MultiLangRecordProcessorFactory getRecordProcessorFactory() {
|
||||
return recordProcessorFactory;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue