2014-10-21 18:28:58 +00:00
|
|
|
/*
|
2015-01-26 22:18:35 +00:00
|
|
|
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
2014-10-21 18:28:58 +00:00
|
|
|
*
|
|
|
|
|
* Licensed under the Amazon Software License (the "License").
|
|
|
|
|
* You may not use this file except in compliance with the License.
|
|
|
|
|
* A copy of the License is located at
|
|
|
|
|
*
|
|
|
|
|
* http://aws.amazon.com/asl/
|
|
|
|
|
*
|
|
|
|
|
* or in the "license" file accompanying this file. This file is distributed
|
|
|
|
|
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
|
|
|
|
* express or implied. See the License for the specific language governing
|
|
|
|
|
* permissions and limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
package com.amazonaws.services.kinesis.multilang;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.io.PrintStream;
|
|
|
|
|
import java.util.concurrent.Callable;
|
|
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
2016-11-07 19:38:04 +00:00
|
|
|
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
|
2014-10-21 18:28:58 +00:00
|
|
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
|
|
|
|
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Main app that launches the worker that runs the multi-language record processor.
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
2014-10-21 18:28:58 +00:00
|
|
|
* Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
|
|
|
|
|
* define these properties:
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
2014-10-21 18:28:58 +00:00
|
|
|
* <pre>
|
|
|
|
|
* # The script that abides by the multi-language protocol. This script will
|
|
|
|
|
* # be executed by the MultiLangDaemon, which will communicate with this script
|
|
|
|
|
* # over STDIN and STDOUT according to the multi-language protocol.
|
|
|
|
|
* executableName = sampleapp.py
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
2014-10-21 18:28:58 +00:00
|
|
|
* # The name of an Amazon Kinesis stream to process.
|
|
|
|
|
* streamName = words
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
2014-10-21 18:28:58 +00:00
|
|
|
* # Used by the KCL as the name of this application. Will be used as the name
|
|
|
|
|
* # of a Amazon DynamoDB table which will store the lease and checkpoint
|
|
|
|
|
* # information for workers with this application name.
|
|
|
|
|
* applicationName = PythonKCLSample
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
2014-10-21 18:28:58 +00:00
|
|
|
* # Users can change the credentials provider the KCL will use to retrieve credentials.
|
|
|
|
|
* # The DefaultAWSCredentialsProviderChain checks several other providers, which is
|
|
|
|
|
* # described here:
|
|
|
|
|
* # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
|
|
|
|
|
* AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
|
|
|
|
public class MultiLangDaemon implements Callable<Integer> {
|
|
|
|
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(MultiLangDaemon.class);
|
|
|
|
|
|
2015-01-26 22:18:35 +00:00
|
|
|
private Worker worker;
|
2014-10-21 18:28:58 +00:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Constructor.
|
2015-01-26 22:18:35 +00:00
|
|
|
*
|
|
|
|
|
* @param configuration The KCL config to use.
|
|
|
|
|
* @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
|
|
|
|
|
* protocol.
|
|
|
|
|
* @param workerThreadPool The executor service to run the daemon in.
|
2014-10-21 18:28:58 +00:00
|
|
|
*/
|
2015-01-26 22:18:35 +00:00
|
|
|
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
|
2014-10-21 18:28:58 +00:00
|
|
|
MultiLangRecordProcessorFactory recordProcessorFactory,
|
|
|
|
|
ExecutorService workerThreadPool) {
|
2016-11-07 19:38:04 +00:00
|
|
|
this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
|
|
|
|
|
KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
|
|
|
|
|
return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
|
|
|
|
|
.execService(workerThreadPool).build();
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
|
|
|
|
|
2015-01-26 22:18:35 +00:00
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
* @param worker A worker to use instead of the default worker.
|
|
|
|
|
*/
|
|
|
|
|
public MultiLangDaemon(Worker worker) {
|
|
|
|
|
this.worker = worker;
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-01-26 22:18:35 +00:00
|
|
|
* Utility for describing how to run this app.
|
|
|
|
|
*
|
|
|
|
|
* @param stream Where to output the usage info.
|
|
|
|
|
* @param messageToPrepend An optional error message to describe why the usage is being printed.
|
2014-10-21 18:28:58 +00:00
|
|
|
*/
|
2015-01-26 22:18:35 +00:00
|
|
|
public static void printUsage(PrintStream stream, String messageToPrepend) {
|
|
|
|
|
StringBuilder builder = new StringBuilder();
|
|
|
|
|
if (messageToPrepend != null) {
|
|
|
|
|
builder.append(messageToPrepend);
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
2015-01-26 22:18:35 +00:00
|
|
|
builder.append(String.format("java %s <properties file>", MultiLangDaemon.class.getCanonicalName()));
|
|
|
|
|
stream.println(builder.toString());
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Integer call() throws Exception {
|
|
|
|
|
int exitCode = 0;
|
|
|
|
|
try {
|
|
|
|
|
worker.run();
|
|
|
|
|
} catch (Throwable t) {
|
|
|
|
|
LOG.error("Caught throwable while processing data.", t);
|
|
|
|
|
exitCode = 1;
|
|
|
|
|
}
|
|
|
|
|
return exitCode;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
|
|
|
|
|
* well as the name of an executable.
|
|
|
|
|
*/
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
|
|
|
|
|
if (args.length == 0) {
|
|
|
|
|
printUsage(System.err, "You must provide a properties file");
|
|
|
|
|
System.exit(1);
|
|
|
|
|
}
|
2015-01-26 22:18:35 +00:00
|
|
|
MultiLangDaemonConfig config = null;
|
2014-10-21 18:28:58 +00:00
|
|
|
try {
|
2015-01-26 22:18:35 +00:00
|
|
|
config = new MultiLangDaemonConfig(args[0]);
|
2014-10-21 18:28:58 +00:00
|
|
|
} catch (IOException e) {
|
|
|
|
|
printUsage(System.err, "You must provide a properties file");
|
|
|
|
|
System.exit(1);
|
2015-01-26 22:18:35 +00:00
|
|
|
} catch (IllegalArgumentException e) {
|
|
|
|
|
printUsage(System.err, e.getMessage());
|
|
|
|
|
System.exit(1);
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
|
|
|
|
|
2015-01-26 22:18:35 +00:00
|
|
|
ExecutorService executorService = config.getExecutorService();
|
2014-10-21 18:28:58 +00:00
|
|
|
|
2015-01-26 22:18:35 +00:00
|
|
|
// Daemon
|
|
|
|
|
MultiLangDaemon daemon = new MultiLangDaemon(
|
|
|
|
|
config.getKinesisClientLibConfiguration(),
|
|
|
|
|
config.getRecordProcessorFactory(),
|
|
|
|
|
executorService);
|
2014-10-21 18:28:58 +00:00
|
|
|
|
2015-01-26 22:18:35 +00:00
|
|
|
Future<Integer> future = executorService.submit(daemon);
|
|
|
|
|
try {
|
|
|
|
|
System.exit(future.get());
|
|
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
|
|
LOG.error("Encountered an error while running daemon", e);
|
2014-10-21 18:28:58 +00:00
|
|
|
}
|
|
|
|
|
System.exit(1);
|
|
|
|
|
}
|
|
|
|
|
}
|