diff --git a/CHANGELOG.md b/CHANGELOG.md
index b22ef573..b31f80a2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,14 @@
# Changelog
+### Release 2.1.0 (January 14, 2019)
+[Milestone #27](https://github.com/awslabs/amazon-kinesis-client/milestone/27)
+* Introducing MultiLangDaemon support for Enhanced Fan-Out.
+* MultiLangDaemon now supports the following command line options.
+ * `--properties-file`: Properties file that the KCL should use to set up the Scheduler.
+ * `--log-configuration`: logback.xml that the KCL should use for logging.
+* Updated AWS SDK dependency to 2.2.0.
+* MultiLangDaemon now uses logback for logging.
+
### Release 2.0.5 (November 12, 2018)
[Milestone #26](https://github.com/awslabs/amazon-kinesis-client/milestone/26?closed=1)
* Fixed a deadlock condition that could occur when using the polling model.
diff --git a/README.md b/README.md
index a1523497..efd7f5a0 100644
--- a/README.md
+++ b/README.md
@@ -61,14 +61,14 @@ The recommended way to use the KCL for Java is to consume it from Maven.
## Release Notes
-### Latest Release (2.0.5 - November 12, 2018)
-[Milestone #26](https://github.com/awslabs/amazon-kinesis-client/milestone/26?closed=1)
-* Fixed a deadlock condition that could occur when using the polling model.
- It was possible to hit a deadlock in the retrieval of records When using the `PollingConfig` and a slow running record processor.
- * [PR #462](https://github.com/awslabs/amazon-kinesis-client/pull/462)
- * [Issue #448](https://github.com/awslabs/amazon-kinesis-client/issues/448)
-* Adjusted `RetrievalConfig`, and `FanOutConfig` to use accessors instead of direct member access.
- * [PR #453](https://github.com/awslabs/amazon-kinesis-client/pull/453)
+### Latest Release (2.1.0 - January 14, 2019)
+[Milestone #27](https://github.com/awslabs/amazon-kinesis-client/milestone/27)
+* Introducing MultiLangDaemon support for Enhanced Fan-Out.
+* MultiLangDaemon now supports the following command line options.
+ * `--properties-file`: Properties file that the KCL should use to set up the Scheduler.
+ * `--log-configuration`: logback.xml that the KCL should use for logging.
+* Updated AWS SDK dependency to 2.2.0.
+* MultiLangDaemon now uses logback for logging.
### For remaining release notes check **[CHANGELOG.md][changelog-md]**.
@@ -84,6 +84,6 @@ The recommended way to use the KCL for Java is to consume it from Maven.
[kinesis-guide-kpl]: http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html
[kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
-[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
+[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java
[changelog-md]: https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md
[migration-guide]: https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html
diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index 8897a8b6..6a7573d6 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -19,18 +19,47 @@
amazon-kinesis-client-pomsoftware.amazon.kinesis
- 2.0.5
+ 2.1.04.0.0amazon-kinesis-client-multilang
+
+ 1.11.477
+
+
software.amazon.kinesisamazon-kinesis-client${project.version}
+
+ software.amazon.awssdk
+ sts
+ ${awssdk.version}
+
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ ${aws-java-sdk.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-cbor
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.projectlombok
@@ -38,11 +67,32 @@
1.16.20provided
-
ch.qos.logbacklogback-classic
- 1.1.7
+ 1.2.3
+
+
+ com.beust
+ jcommander
+ 1.72
+
+
+ commons-io
+ commons-io
+ 2.6
+
+
+ org.apache.commons
+ commons-collections4
+ 4.2
+
+
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.3
@@ -52,21 +102,18 @@
4.11test
-
org.mockitomockito-all1.10.19test
-
org.hamcresthamcrest-all1.3test
-
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
deleted file mode 100644
index ecb70d22..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import lombok.extern.slf4j.Slf4j;
-import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
-import software.amazon.kinesis.coordinator.Scheduler;
-import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
-
-/**
- * Main app that launches the scheduler that runs the multi-language record processor.
- *
- * Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
- * define these properties:
- *
- *
- * # The script that abides by the multi-language protocol. This script will
- * # be executed by the MultiLangDaemon, which will communicate with this script
- * # over STDIN and STDOUT according to the multi-language protocol.
- * executableName = sampleapp.py
- *
- * # The name of an Amazon Kinesis stream to process.
- * streamName = words
- *
- * # Used by the KCL as the name of this application. Will be used as the name
- * # of a Amazon DynamoDB table which will store the lease and checkpoint
- * # information for workers with this application name.
- * applicationName = PythonKCLSample
- *
- * # Users can change the credentials provider the KCL will use to retrieve credentials.
- * # The DefaultAWSCredentialsProviderChain checks several other providers, which is
- * # described here:
- * # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
- * AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
- *
- */
-@Slf4j
-public class MultiLangDaemon implements Callable {
- private Scheduler scheduler;
-
- /**
- * Constructor.
- *
- * @param configuration The KCL config to use.
- * @param recordProcessorFactory A record processor factory to create record processors that abide by the multi-lang
- * protocol.
- * @param workerThreadPool The executor service to run the daemon in.
- */
- public MultiLangDaemon(KinesisClientLibConfiguration configuration,
- MultiLangRecordProcessorFactory recordProcessorFactory,
- ExecutorService workerThreadPool) {
- this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
- }
-
- private static Scheduler buildWorker(ShardRecordProcessorFactory recordShardRecordProcessorFactory,
- KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
- return null;
- }
-
- /**
- *
- * @param scheduler A scheduler to use instead of the default scheduler.
- */
- public MultiLangDaemon(Scheduler scheduler) {
- this.scheduler = scheduler;
- }
-
- /**
- * Utility for describing how to run this app.
- *
- * @param stream Where to output the usage info.
- * @param messageToPrepend An optional error message to describe why the usage is being printed.
- */
- public static void printUsage(PrintStream stream, String messageToPrepend) {
- StringBuilder builder = new StringBuilder();
- if (messageToPrepend != null) {
- builder.append(messageToPrepend);
- }
- builder.append(String.format("java %s ", MultiLangDaemon.class.getCanonicalName()));
- stream.println(builder.toString());
- }
-
- @Override
- public Integer call() throws Exception {
- int exitCode = 0;
- try {
- scheduler.run();
- } catch (Throwable t) {
- log.error("Caught throwable while processing data.", t);
- exitCode = 1;
- }
- return exitCode;
- }
-
- /**
- * @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
- * well as the name of an executable.
- */
- public static void main(String[] args) {
-
- if (args.length == 0) {
- printUsage(System.err, "You must provide a properties file");
- System.exit(1);
- }
- MultiLangDaemonConfig config = null;
- try {
- config = new MultiLangDaemonConfig(args[0]);
- } catch (IOException e) {
- printUsage(System.err, "You must provide a properties file");
- System.exit(1);
- } catch (IllegalArgumentException e) {
- printUsage(System.err, e.getMessage());
- System.exit(1);
- }
-
- ExecutorService executorService = config.getExecutorService();
-
- // Daemon
- final MultiLangDaemon daemon = new MultiLangDaemon(
- config.getKinesisClientLibConfiguration(),
- config.getRecordProcessorFactory(),
- executorService);
-
- final long shutdownGraceMillis = config.getKinesisClientLibConfiguration().getShutdownGraceMillis();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- log.info("Process terminated, will initiate shutdown.");
- try {
- Future fut = daemon.scheduler.requestShutdown();
- fut.get(shutdownGraceMillis, TimeUnit.MILLISECONDS);
- log.info("Process shutdown is complete.");
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Encountered an error during shutdown.", e);
- }
- }
- });
-
- Future future = executorService.submit(daemon);
- try {
- System.exit(future.get());
- } catch (InterruptedException | ExecutionException e) {
- log.error("Encountered an error while running daemon", e);
- }
- System.exit(1);
- }
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/BooleanPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/BooleanPropertyValueDecoder.java
deleted file mode 100644
index e57413dd..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/BooleanPropertyValueDecoder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Provide boolean property.
- */
-class BooleanPropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * Constructor.
- */
- BooleanPropertyValueDecoder() {
- }
-
- /**
- * @param value property value as String
- * @return corresponding variable in correct type
- */
- @Override
- public Boolean decodeValue(String value) {
- return Boolean.parseBoolean(value);
- }
-
- /**
- * @return list of supported types
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(boolean.class, Boolean.class);
- }
-
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/InitialPositionInStreamPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/InitialPositionInStreamPropertyValueDecoder.java
deleted file mode 100644
index 0b44273a..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/InitialPositionInStreamPropertyValueDecoder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import java.util.Arrays;
-import java.util.List;
-
-import software.amazon.kinesis.common.InitialPositionInStream;
-
-/**
- * Get an InitialiPosition enum property.
- */
-class InitialPositionInStreamPropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * Constructor.
- */
- InitialPositionInStreamPropertyValueDecoder() {
- }
-
- /**
- * @param value property value as String
- * @return corresponding variable in correct type
- */
- @Override
- public InitialPositionInStream decodeValue(String value) {
- return InitialPositionInStream.valueOf(value.toUpperCase());
- }
-
- /**
- * @return list of supported types
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(InitialPositionInStream.class);
- }
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/KinesisClientLibConfigurator.java
deleted file mode 100644
index 853a7cc9..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/KinesisClientLibConfigurator.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import lombok.extern.slf4j.Slf4j;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
- * three properties must be provided. 1) "applicationName" 2) "streamName" 3) "AWSCredentialsProvider"
- * KinesisClientLibConfigurator will help to automatically assign the value of "workerId" if this property is not
- * provided. In the specified properties file, any properties, which matches the variable name in
- * KinesisClientLibConfiguration and has a corresponding "with{variableName}" setter method, will be read in, and its
- * value in properties file will be assigned to corresponding variable in KinesisClientLibConfiguration.
- */
-@Slf4j
-public class KinesisClientLibConfigurator {
- private static final String PREFIX = "with";
-
- // Required properties
- private static final String PROP_APP_NAME = "applicationName";
- private static final String PROP_STREAM_NAME = "streamName";
- private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
- private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
- private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
- private static final String PROP_WORKER_ID = "workerId";
-
- private Map, IPropertyValueDecoder>> classToDecoder;
- private Map> nameToMethods;
-
- /**
- * Constructor.
- */
- public KinesisClientLibConfigurator() {
- List> getters =
- Arrays.asList(new IntegerPropertyValueDecoder(),
- new LongPropertyValueDecoder(),
- new BooleanPropertyValueDecoder(),
- new DatePropertyValueDecoder(),
- new AWSCredentialsProviderPropertyValueDecoder(),
- new StringPropertyValueDecoder(),
- new InitialPositionInStreamPropertyValueDecoder(),
- new SetPropertyValueDecoder());
-
- classToDecoder = new Hashtable<>();
- for (IPropertyValueDecoder> getter : getters) {
- for (Class> clazz : getter.getSupportedTypes()) {
- /*
- * We could validate that we never overwrite a getter but we can also do this by manual inspection of
- * the getters.
- */
- classToDecoder.put(clazz, getter);
- }
- }
- nameToMethods = new Hashtable<>();
- for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
- if (!nameToMethods.containsKey(method.getName())) {
- nameToMethods.put(method.getName(), new ArrayList<>());
- }
- nameToMethods.get(method.getName()).add(method);
- }
- }
-
- /**
- * Return a KinesisClientLibConfiguration with variables configured as specified by the properties in config stream.
- * Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and
- * continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not
- * match any of the variables in KinesisClientLibConfigration.
- *
- * @param properties a Properties object containing the configuration information
- * @return KinesisClientLibConfiguration
- */
- public KinesisClientLibConfiguration getConfiguration(Properties properties) {
- // The three minimum required arguments for constructor are obtained first. They are all mandatory, all of them
- // should be provided. If any of these three failed to be set, program will fail.
- IPropertyValueDecoder stringValueDecoder = new StringPropertyValueDecoder();
- IPropertyValueDecoder awsCPPropGetter =
- new AWSCredentialsProviderPropertyValueDecoder();
- String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
- String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
- AwsCredentialsProvider provider =
- awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));
-
- if (applicationName == null || applicationName.isEmpty()) {
- throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
- }
- if (streamName == null || streamName.isEmpty()) {
- throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
- }
-
- // Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
- AwsCredentialsProvider providerDynamoDB;
- String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
- if (propCredentialsProviderDynamoDBValue == null) {
- providerDynamoDB = provider;
- } else {
- providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
- }
-
- // Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
- AwsCredentialsProvider providerCloudWatch;
- String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
- if (propCredentialsProviderCloudWatchValue == null) {
- providerCloudWatch = provider;
- } else {
- providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
- }
-
- // Allow customer not to provide workerId or to provide empty worker id.
- String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
- if (workerId == null || workerId.isEmpty()) {
- workerId = UUID.randomUUID().toString();
- log.info("Value of workerId is not provided in the properties. WorkerId is automatically assigned as: {}",
- workerId);
- }
-
- KinesisClientLibConfiguration config =
- new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);
-
- Set requiredNames =
- new HashSet(Arrays.asList(PROP_STREAM_NAME,
- PROP_APP_NAME,
- PROP_WORKER_ID,
- PROP_CREDENTIALS_PROVIDER_KINESIS));
-
- // Set all the variables that are not used for constructor.
- for (Object keyObject : properties.keySet()) {
- String key = keyObject.toString();
- if (!requiredNames.contains(key)) {
- withProperty(key, properties, config);
- }
- }
-
- return config;
- }
-
- /**
- * @param configStream the input stream containing the configuration information
- * @return KinesisClientLibConfiguration
- */
- public KinesisClientLibConfiguration getConfiguration(InputStream configStream) {
- Properties properties = new Properties();
- try {
- properties.load(configStream);
- } catch (IOException e) {
- String msg = "Could not load properties from the stream provided";
- throw new IllegalStateException(msg, e);
- } finally {
- try {
- configStream.close();
- } catch (IOException e) {
- String msg = "Encountered error while trying to close properties file.";
- throw new IllegalStateException(msg, e);
- }
- }
- return getConfiguration(properties);
- }
-
- private void withProperty(String propertyKey, Properties properties, KinesisClientLibConfiguration config) {
- if (propertyKey.isEmpty()) {
- throw new IllegalArgumentException("The property can't be empty string");
- }
- // Assume that all the setters in KinesisClientLibConfiguration are in the following format
- // They all start with "with" followed by the variable name with first letter capitalized
- String targetMethodName = PREFIX + Character.toUpperCase(propertyKey.charAt(0)) + propertyKey.substring(1);
- String propertyValue = properties.getProperty(propertyKey);
- if (nameToMethods.containsKey(targetMethodName)) {
- for (Method method : nameToMethods.get(targetMethodName)) {
- if (method.getParameterTypes().length == 1 && method.getName().equals(targetMethodName)) {
- Class> paramType = method.getParameterTypes()[0];
- if (classToDecoder.containsKey(paramType)) {
- IPropertyValueDecoder> decoder = classToDecoder.get(paramType);
- try {
- method.invoke(config, decoder.decodeValue(propertyValue));
- log.info("Successfully set property {} with value {}", propertyKey, propertyValue);
- return;
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- // At this point, we really thought that we could call this method.
- log.warn("Encountered an error while invoking method %s with value {}. Exception was {}",
- method, propertyValue, e);
- } catch (UnsupportedOperationException e) {
- log.warn("The property {} is not supported as type {} at this time.", propertyKey,
- paramType);
- }
- } else {
- log.debug("No method for decoding parameters of type {} so method {} could not be invoked.",
- paramType, method);
- }
- } else {
- log.debug("Method {} doesn't look like it is appropriate for setting property {}. Looking for"
- + " something called {}.", method, propertyKey, targetMethodName);
- }
- }
- } else {
- log.debug(String.format("There was no appropriately named method for setting property %s.", propertyKey));
- }
- }
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/LongPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/LongPropertyValueDecoder.java
deleted file mode 100644
index 1382b153..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/LongPropertyValueDecoder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Get long type properties.
- */
-class LongPropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * Constructor.
- */
- LongPropertyValueDecoder() {
- }
-
- /**
- * @param value property value as String
- * @return corresponding variable in correct type
- */
- @Override
- public Long decodeValue(String value) {
- return Long.parseLong(value);
- }
-
- /**
- * @return list of supported types
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(long.class, Long.class);
- }
-
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/SetPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/SetPropertyValueDecoder.java
deleted file mode 100644
index 6dfe2dbe..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/SetPropertyValueDecoder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Provide {@link Set} property value. Note that since parameterized value cannot be figured out during compile time
- * for setter methods, only {@code Set} of {@code String}s are supported as property value decode.
- */
-@SuppressWarnings("rawtypes")
-class SetPropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * Delimiter for the list provided as string.
- */
- private static final String LIST_DELIMITER = ",";
-
- /**
- * Package constructor for factory use only.
- */
- SetPropertyValueDecoder() {
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Set decodeValue(String propertyValue) {
- String[] values = propertyValue.split(LIST_DELIMITER);
- String value = null;
- Set decodedValue = new HashSet<>();
- for (int i = 0; i < values.length; i++) {
- value = values[i].trim();
- if (!value.isEmpty()) {
- decodedValue.add(value);
- }
- }
- return decodedValue;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(Set.class);
- }
-
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/StringPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/StringPropertyValueDecoder.java
deleted file mode 100644
index d5cc0482..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/StringPropertyValueDecoder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.config;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Get string properties from properties file.
- */
-class StringPropertyValueDecoder implements IPropertyValueDecoder {
-
- /**
- * package constructor for factory use only.
- */
- StringPropertyValueDecoder() {
- }
-
- /**
- * @param value the property value
- * @return the value as String
- */
- @Override
- public String decodeValue(String value) {
- // How to treat null or empty String should depend on those method who
- // uses the String value. Here just return the string as it is.
- if (value == null) {
- return null;
- }
- return value.trim();
- }
-
- /**
- * @return list of supported types
- */
- @Override
- public List> getSupportedTypes() {
- return Arrays.asList(String.class);
- }
-
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java b/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java
deleted file mode 100644
index 921cca1b..00000000
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/asl/
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package com.amazonaws.services.kinesis.multilang.messages;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.experimental.Accessors;
-
-/**
- * A message sent by the client's process to indicate to the record processor that it completed a particular action.
- */
-@NoArgsConstructor
-@AllArgsConstructor
-@Getter
-@Setter
-public class StatusMessage extends Message {
- /**
- * The name used for the action field in {@link Message}.
- */
- public static final String ACTION = "status";
-
- /**
- * The name of the most recently received action.
- */
- private String responseFor;
-}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDERRTask.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDERRTask.java
similarity index 60%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDERRTask.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDERRTask.java
index 7276b229..d8236683 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDERRTask.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDERRTask.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
*
- * http://aws.amazon.com/asl/
+ * http://aws.amazon.com/asl/
*
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedReader;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDOUTTask.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDOUTTask.java
similarity index 97%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDOUTTask.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDOUTTask.java
index 0e95a14e..99f47c7f 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/DrainChildSTDOUTTask.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/DrainChildSTDOUTTask.java
@@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedReader;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/GetNextMessageTask.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/GetNextMessageTask.java
similarity index 96%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/GetNextMessageTask.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/GetNextMessageTask.java
index 8177a8d2..15869a29 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/GetNextMessageTask.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/GetNextMessageTask.java
@@ -12,12 +12,12 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedReader;
import java.io.IOException;
-import com.amazonaws.services.kinesis.multilang.messages.Message;
+import software.amazon.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/LineReaderTask.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/LineReaderTask.java
similarity index 99%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/LineReaderTask.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/LineReaderTask.java
index 650fc0c5..6205ef53 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/LineReaderTask.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/LineReaderTask.java
@@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedReader;
import java.io.IOException;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageReader.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageReader.java
similarity index 87%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageReader.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageReader.java
index 6bd3aa93..60314c6f 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageReader.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageReader.java
@@ -1,18 +1,18 @@
/*
- * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
- * Licensed under the Amazon Software License (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
*
- * http://aws.amazon.com/asl/
+ * http://aws.amazon.com/asl/
*
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedReader;
import java.io.InputStream;
@@ -20,7 +20,7 @@ import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import com.amazonaws.services.kinesis.multilang.messages.Message;
+import software.amazon.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageWriter.java
similarity index 82%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageWriter.java
index 164a36bf..5aea31fa 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MessageWriter.java
@@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.BufferedWriter;
import java.io.IOException;
@@ -22,18 +22,20 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
-import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
-import com.amazonaws.services.kinesis.multilang.messages.Message;
-import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
-import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
-import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-import software.amazon.kinesis.lifecycle.ShutdownReason;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
+import software.amazon.kinesis.multilang.messages.CheckpointMessage;
+import software.amazon.kinesis.multilang.messages.InitializeMessage;
+import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
+import software.amazon.kinesis.multilang.messages.Message;
+import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
+import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
+import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
/**
* Defines methods for writing {@link Message} objects to the child process's STDIN.
@@ -135,12 +137,25 @@ class MessageWriter {
}
/**
- * Writes a {@link ShutdownMessage} to the subprocess.
+ * Writes the lease lost message to the sub process.
*
- * @param reason The reason for shutting down.
+ * @param leaseLostInput
+ * the lease lost input. This is currently unused as lease loss doesn't actually have anything in it
+ * @return A future that is set when the message has been written.
*/
- Future writeShutdownMessage(ShutdownReason reason) {
- return writeMessage(new ShutdownMessage(reason));
+ Future writeLeaseLossMessage(@SuppressWarnings("unused") LeaseLostInput leaseLostInput) {
+ return writeMessage(new LeaseLostMessage());
+ }
+
+ /**
+ * Writes a message to the sub process indicating that the shard has ended
+ *
+ * @param shardEndedInput
+ * the shard end input. This is currently unused as the checkpoint is extracted, and used by the caller.
+ * @return A future that is set when the message has been written.
+ */
+ Future writeShardEndedMessage(@SuppressWarnings("unused") ShardEndedInput shardEndedInput) {
+ return writeMessage(new ShardEndedMessage());
}
/**
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java
new file mode 100644
index 00000000..79e8aed9
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemon.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package software.amazon.kinesis.multilang;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.coordinator.Scheduler;
+
+/**
+ * Main app that launches the scheduler that runs the multi-language record processor.
+ *
+ * Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
+ * define these properties:
+ *
+ *
+ * # The script that abides by the multi-language protocol. This script will
+ * # be executed by the MultiLangDaemon, which will communicate with this script
+ * # over STDIN and STDOUT according to the multi-language protocol.
+ * executableName = sampleapp.py
+ *
+ * # The name of an Amazon Kinesis stream to process.
+ * streamName = words
+ *
+ * # Used by the KCL as the name of this application. Will be used as the name
+ * # of a Amazon DynamoDB table which will store the lease and checkpoint
+ * # information for workers with this application name.
+ * applicationName = PythonKCLSample
+ *
+ * # Users can change the credentials provider the KCL will use to retrieve credentials.
+ * # The DefaultAWSCredentialsProviderChain checks several other providers, which is
+ * # described here:
+ * # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
+ * AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
+ *
+ */
+@Slf4j
+public class MultiLangDaemon {
+ static class MultiLangDaemonArguments {
+ @Parameter
+ List parameters = new ArrayList<>();
+
+ @Parameter(names = { "-p", "--properties-file" }, description = "Properties file to be used with the KCL")
+ String propertiesFile;
+
+ @Parameter(names = { "-l",
+ "--log-configuration" }, description = "File location of logback.xml to be override the default")
+ String logConfiguration;
+ }
+
+ @Data
+ @Accessors(fluent = true)
+ static class MultiLangRunner implements Callable {
+ private final Scheduler scheduler;
+
+ @Override
+ public Integer call() throws Exception {
+ int exitCode = 0;
+ try {
+ scheduler().run();
+ } catch (Throwable t) {
+ log.error("Caught throwable while processing data", t);
+ exitCode = 1;
+ }
+ return exitCode;
+ }
+ }
+
+ JCommander buildJCommanderAndParseArgs(final MultiLangDaemonArguments arguments, final String[] args) {
+ JCommander jCommander = JCommander.newBuilder().programName("amazon-kinesis-client MultiLangDaemon")
+ .addObject(arguments)
+ .build();
+ jCommander.parse(args);
+ return jCommander;
+ }
+
+ void printUsage(final JCommander jCommander, final String message) {
+ if (StringUtils.isNotEmpty(message)) {
+ System.err.println(message);
+ }
+ jCommander.usage();
+ }
+
+ Scheduler buildScheduler(final MultiLangDaemonConfig config) {
+ return config.getMultiLangDaemonConfiguration().build(config.getRecordProcessorFactory());
+ }
+
+ void configureLogging(final String logConfiguration) {
+ if (StringUtils.isNotEmpty(logConfiguration)) {
+ LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configureLogging(logConfiguration, loggerContext, configurator);
+ }
+ }
+
+ void configureLogging(final String logConfiguration, final LoggerContext loggerContext,
+ final JoranConfigurator configurator) {
+ loggerContext.reset();
+ try (InputStream inputStream = FileUtils.openInputStream(new File(logConfiguration))) {
+ configurator.setContext(loggerContext);
+ configurator.doConfigure(inputStream);
+ } catch (IOException | JoranException e) {
+ throw new RuntimeException("Error while loading log configuration: " + e.getMessage());
+ }
+ }
+
+ String propertiesFile(final MultiLangDaemonArguments arguments) {
+ String propertiesFile = "";
+
+ if (CollectionUtils.isNotEmpty(arguments.parameters)) {
+ if (arguments.parameters.size() == 1) {
+ propertiesFile = arguments.parameters.get(0);
+ } else {
+ throw new RuntimeException(
+ "Expected a single argument, but found multiple arguments. Arguments: "
+ + String.join(", ", arguments.parameters));
+ }
+ }
+
+ if (StringUtils.isNotEmpty(arguments.propertiesFile)) {
+ if (StringUtils.isNotEmpty(propertiesFile)) {
+ log.warn("Overriding the properties file with the --properties-file option");
+ }
+ propertiesFile = arguments.propertiesFile;
+ }
+
+ if (StringUtils.isEmpty(propertiesFile)) {
+ throw new RuntimeException("Properties file missing, please provide a properties file");
+ }
+
+ return propertiesFile;
+ }
+
+ MultiLangDaemonConfig buildMultiLangDaemonConfig(final String propertiesFile) {
+ try {
+ return new MultiLangDaemonConfig(propertiesFile);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading properties file: " + e.getMessage());
+ }
+ }
+
+ void setupShutdownHook(final Runtime runtime, final MultiLangRunner runner, final MultiLangDaemonConfig config) {
+ long shutdownGraceMillis = config.getMultiLangDaemonConfiguration().getShutdownGraceMillis();
+ runtime.addShutdownHook(new Thread(() -> {
+ log.info("Process terminated, will initiate shutdown.");
+ try {
+ Future runnerFuture = runner.scheduler().startGracefulShutdown();
+ runnerFuture.get(shutdownGraceMillis, TimeUnit.MILLISECONDS);
+ log.info("Process shutdown is complete.");
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Encountered an error during shutdown.", e);
+ }
+ }));
+ }
+
+ int submitRunnerAndWait(final MultiLangDaemonConfig config, final MultiLangRunner runner) {
+ ExecutorService executorService = config.getExecutorService();
+ Future future = executorService.submit(runner);
+
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Encountered an error while running daemon", e);
+ }
+ return 1;
+ }
+
+ void exit(final int exitCode) {
+ System.exit(exitCode);
+ }
+
+ /**
+ * @param args
+ * Accepts a single argument, that argument is a properties file which provides KCL configuration as
+ * well as the name of an executable.
+ */
+ public static void main(final String[] args) {
+ int exitCode = 1;
+ MultiLangDaemon daemon = new MultiLangDaemon();
+ MultiLangDaemonArguments arguments = new MultiLangDaemonArguments();
+ JCommander jCommander = daemon.buildJCommanderAndParseArgs(arguments, args);
+ try {
+ String propertiesFile = daemon.propertiesFile(arguments);
+ daemon.configureLogging(arguments.logConfiguration);
+ MultiLangDaemonConfig config = daemon.buildMultiLangDaemonConfig(propertiesFile);
+
+ Scheduler scheduler = daemon.buildScheduler(config);
+ MultiLangRunner runner = new MultiLangRunner(scheduler);
+
+ daemon.setupShutdownHook(Runtime.getRuntime(), runner, config);
+ exitCode = daemon.submitRunnerAndWait(config, runner);
+ } catch (Throwable t) {
+ t.printStackTrace(System.err);
+ daemon.printUsage(jCommander, t.getMessage());
+ System.err.println("For more information, visit: https://github.com/awslabs/amazon-kinesis-client");
+ }
+ daemon.exit(exitCode);
+ }
+}
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java
similarity index 89%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java
index 70f90a06..d8f37b34 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java
@@ -5,7 +5,7 @@
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.File;
import java.io.FileInputStream;
@@ -19,11 +19,11 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
-import com.amazonaws.services.kinesis.multilang.config.KinesisClientLibConfigurator;
+import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.retrieval.RetrievalConfig;
/**
@@ -38,7 +38,7 @@ public class MultiLangDaemonConfig {
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
- private KinesisClientLibConfiguration kinesisClientLibConfig;
+ private MultiLangDaemonConfiguration multiLangDaemonConfiguration;
private ExecutorService executorService;
@@ -98,13 +98,13 @@ public class MultiLangDaemonConfig {
String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
String processingLanguage = properties.getProperty(PROP_PROCESSING_LANGUAGE);
- kinesisClientLibConfig = configurator.getConfiguration(properties);
+ multiLangDaemonConfiguration = configurator.getConfiguration(properties);
executorService = buildExecutorService(properties);
recordProcessorFactory = new MultiLangRecordProcessorFactory(executableName, executorService,
- kinesisClientLibConfig);
+ multiLangDaemonConfiguration);
- log.info("Running {} to process stream {} with executable {}", kinesisClientLibConfig.getApplicationName(),
- kinesisClientLibConfig.getStreamName(), executableName);
+ log.info("Running {} to process stream {} with executable {}", multiLangDaemonConfiguration.getApplicationName(),
+ multiLangDaemonConfiguration.getStreamName(), executableName);
prepare(processingLanguage);
}
@@ -112,9 +112,7 @@ public class MultiLangDaemonConfig {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
- log.info("Using workerId: {}", kinesisClientLibConfig.getWorkerIdentifier());
- log.info("Using credentials with access key id: {}",
- kinesisClientLibConfig.getKinesisCredentialsProvider().resolveCredentials().accessKeyId());
+ log.info("Using workerId: {}", multiLangDaemonConfiguration.getWorkerIdentifier());
StringBuilder userAgent = new StringBuilder(RetrievalConfig.KINESIS_CLIENT_LIB_USER_AGENT);
userAgent.append(" ");
@@ -133,7 +131,7 @@ public class MultiLangDaemonConfig {
}
log.info("MultiLangDaemon is adding the following fields to the User Agent: {}", userAgent.toString());
- kinesisClientLibConfig.withUserAgent(userAgent.toString());
+// multiLangDaemonConfiguration.withUserAgent(userAgent.toString());
}
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
@@ -190,8 +188,8 @@ public class MultiLangDaemonConfig {
*
* @return A KinesisClientLibConfiguration object based on the properties file provided.
*/
- public KinesisClientLibConfiguration getKinesisClientLibConfiguration() {
- return kinesisClientLibConfig;
+ public MultiLangDaemonConfiguration getMultiLangDaemonConfiguration() {
+ return multiLangDaemonConfiguration;
}
/**
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java
similarity index 82%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java
index 75e552ce..7e9823e0 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java
@@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -20,21 +20,22 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import software.amazon.kinesis.exceptions.InvalidStateException;
-import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
-import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
-import software.amazon.kinesis.lifecycle.ShutdownReason;
-import software.amazon.kinesis.lifecycle.events.InitializationInput;
-import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
-import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
-import com.amazonaws.services.kinesis.multilang.messages.Message;
-import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
-import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
-import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
-import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
-
import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.exceptions.InvalidStateException;
+import software.amazon.kinesis.lifecycle.events.InitializationInput;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
+import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
+import software.amazon.kinesis.multilang.messages.CheckpointMessage;
+import software.amazon.kinesis.multilang.messages.InitializeMessage;
+import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
+import software.amazon.kinesis.multilang.messages.Message;
+import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
+import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
+import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
+import software.amazon.kinesis.multilang.messages.StatusMessage;
+import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
/**
* An implementation of the multi language protocol.
@@ -42,10 +43,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
class MultiLangProtocol {
+ private final InitializationInput initializationInput;
+ private final Optional timeoutInSeconds;
+
private MessageReader messageReader;
private MessageWriter messageWriter;
- private final InitializationInput initializationInput;
- private KinesisClientLibConfiguration configuration;
+
+ private MultiLangDaemonConfiguration configuration;
/**
* Constructor.
@@ -58,11 +62,12 @@ class MultiLangProtocol {
* information about the shard this processor is starting to process
*/
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter,
- InitializationInput initializationInput, KinesisClientLibConfiguration configuration) {
+ InitializationInput initializationInput, MultiLangDaemonConfiguration configuration) {
this.messageReader = messageReader;
this.messageWriter = messageWriter;
this.initializationInput = initializationInput;
this.configuration = configuration;
+ this.timeoutInSeconds = Optional.ofNullable(configuration.getTimeoutInSeconds());
}
/**
@@ -94,16 +99,24 @@ class MultiLangProtocol {
}
/**
- * Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
- * {@link StatusMessage} on its STDOUT.
- *
- * @param checkpointer A checkpointer.
- * @param reason Why this processor is being shutdown.
- * @return Whether or not this operation succeeded.
+ * Notifies the client process that the lease has been lost, and it needs to shutdown.
+ *
+ * @param leaseLostInput
+ * the lease lost input that is passed to the {@link MessageWriter}
+ * @return true if the message was successfully writtem
*/
- boolean shutdown(RecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
- Future writeFuture = messageWriter.writeShutdownMessage(reason);
- return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture);
+ boolean leaseLost(LeaseLostInput leaseLostInput) {
+ return waitForStatusMessage(LeaseLostMessage.ACTION, null, messageWriter.writeLeaseLossMessage(leaseLostInput));
+ }
+
+ /**
+ *
+ * @param shardEndedInput
+ * @return
+ */
+ boolean shardEnded(ShardEndedInput shardEndedInput) {
+ return waitForStatusMessage(ShardEndedMessage.ACTION, shardEndedInput.checkpointer(),
+ messageWriter.writeShardEndedMessage(shardEndedInput));
}
/**
@@ -164,18 +177,18 @@ class MultiLangProtocol {
Optional statusMessage = Optional.empty();
while (!statusMessage.isPresent()) {
Future future = this.messageReader.getNextMessageFromSTDOUT();
- Optional message = configuration.getTimeoutInSeconds()
- .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
- .orElse(futureMethod(future::get, action));
+ Optional message = timeoutInSeconds
+ .map(second -> futureMethod(() -> future.get(second, TimeUnit.SECONDS), action))
+ .orElse(futureMethod(future::get, action));
if (!message.isPresent()) {
return false;
}
- Optional checkpointFailed = message.filter(m -> m instanceof CheckpointMessage )
- .map(m -> (CheckpointMessage) m)
- .flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
- .map(checkpointSuccess -> !checkpointSuccess);
+ Optional checkpointFailed = message.filter(m -> m instanceof CheckpointMessage)
+ .map(m -> (CheckpointMessage) m)
+ .flatMap(m -> futureMethod(() -> checkpoint(m, checkpointer).get(), "Checkpoint"))
+ .map(checkpointSuccess -> !checkpointSuccess);
if (checkpointFailed.orElse(false)) {
return false;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangRecordProcessorFactory.java
similarity index 89%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangRecordProcessorFactory.java
index 734e6364..b1858a63 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessorFactory.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangRecordProcessorFactory.java
@@ -12,7 +12,7 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.util.concurrent.ExecutorService;
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
+import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShardRecordProcessor;
@@ -37,14 +38,14 @@ public class MultiLangRecordProcessorFactory implements ShardRecordProcessorFact
private final ExecutorService executorService;
- private final KinesisClientLibConfiguration configuration;
+ private final MultiLangDaemonConfiguration configuration;
/**
* @param command The command that will do processing for this factory's record processors.
* @param executorService An executor service to use while processing inputs and outputs of the child process.
*/
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService,
- KinesisClientLibConfiguration configuration) {
+ MultiLangDaemonConfiguration configuration) {
this(command, executorService, new ObjectMapper(), configuration);
}
@@ -54,7 +55,7 @@ public class MultiLangRecordProcessorFactory implements ShardRecordProcessorFact
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
*/
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper,
- KinesisClientLibConfiguration configuration) {
+ MultiLangDaemonConfiguration configuration) {
this.command = command;
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
this.executorService = executorService;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangShardRecordProcessor.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java
similarity index 93%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangShardRecordProcessor.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java
index 94df3c36..ae7996db 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangShardRecordProcessor.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java
@@ -12,26 +12,25 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang;
+package software.amazon.kinesis.multilang;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.function.Function;
-import software.amazon.kinesis.lifecycle.ShutdownReason;
-import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
-import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
-import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
-import software.amazon.kinesis.processor.ShardRecordProcessor;
-import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
-import software.amazon.kinesis.lifecycle.events.InitializationInput;
-import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-import software.amazon.kinesis.lifecycle.ShutdownInput;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.lifecycle.events.InitializationInput;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
+import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
+import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
+import software.amazon.kinesis.processor.ShardRecordProcessor;
/**
@@ -64,7 +63,7 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
private MultiLangProtocol protocol;
- private KinesisClientLibConfiguration configuration;
+ private MultiLangDaemonConfiguration configuration;
@Override
public void initialize(InitializationInput initializationInput) {
@@ -113,12 +112,12 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
- shutdown(ShutdownInput.builder().shutdownReason(ShutdownReason.LEASE_LOST).build());
+ shutdown(p -> p.leaseLost(leaseLostInput));
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
- shutdown(ShutdownInput.builder().shutdownReason(ShutdownReason.SHARD_END).checkpointer(shardEndedInput.checkpointer()).build());
+ shutdown(p -> p.shardEnded(shardEndedInput));
}
@Override
@@ -134,7 +133,7 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
}
}
- void shutdown(ShutdownInput shutdownInput) {
+ void shutdown(Function protocolInvocation) {
// In cases where KCL loses lease for the shard after creating record processor instance but before
// record processor initialize() is called, then shutdown() may be called directly before initialize().
if (!initialized) {
@@ -146,7 +145,7 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
try {
if (ProcessState.ACTIVE.equals(this.state)) {
- if (!protocol.shutdown(shutdownInput.checkpointer(), shutdownInput.shutdownReason())) {
+ if (!protocolInvocation.apply(protocol)) {
throw new RuntimeException("Child process failed to shutdown");
}
@@ -182,7 +181,7 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
* An obejct mapper.
*/
MultiLangShardRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
- ObjectMapper objectMapper, KinesisClientLibConfiguration configuration) {
+ ObjectMapper objectMapper, MultiLangDaemonConfiguration configuration) {
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
new DrainChildSTDERRTask(), configuration);
}
@@ -205,7 +204,7 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
*/
MultiLangShardRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper,
MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask,
- KinesisClientLibConfiguration configuration) {
+ MultiLangDaemonConfiguration configuration) {
this.executorService = executorService;
this.processBuilder = processBuilder;
this.objectMapper = objectMapper;
diff --git a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
similarity index 83%
rename from amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
rename to amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
index f6e1883c..103741ca 100644
--- a/amazon-kinesis-client-multilang/src/main/java/com/amazonaws/services/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
@@ -12,13 +12,15 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.multilang.config;
+package software.amazon.kinesis.multilang.config;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
@@ -27,7 +29,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
* Get AWSCredentialsProvider property.
*/
@Slf4j
-class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder {
+class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder {
private static final String AUTH_PREFIX = "com.amazonaws.auth.";
private static final String LIST_DELIMITER = ",";
private static final String ARG_DELIMITER = "|";
@@ -46,13 +48,13 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
* @return corresponding variable in correct type
*/
@Override
- public AwsCredentialsProvider decodeValue(String value) {
+ public AWSCredentialsProvider decodeValue(String value) {
if (value != null) {
List providerNames = getProviderNames(value);
- List providers = getValidCredentialsProviders(providerNames);
- AwsCredentialsProvider[] ps = new AwsCredentialsProvider[providers.size()];
+ List providers = getValidCredentialsProviders(providerNames);
+ AWSCredentialsProvider[] ps = new AWSCredentialsProvider[providers.size()];
providers.toArray(ps);
- return AwsCredentialsProviderChain.builder().credentialsProviders(ps).build();
+ return new AWSCredentialsProviderChain(providers);
} else {
throw new IllegalArgumentException("Property AWSCredentialsProvider is missing.");
}
@@ -62,15 +64,15 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
* @return list of supported types
*/
@Override
- public List> getSupportedTypes() {
- return Arrays.asList(AwsCredentialsProvider.class);
+ public List> getSupportedTypes() {
+ return Arrays.asList(AWSCredentialsProvider.class);
}
/*
* Convert string list to a list of valid credentials providers.
*/
- private static List getValidCredentialsProviders(List providerNames) {
- List credentialsProviders = new ArrayList();
+ private static List getValidCredentialsProviders(List providerNames) {
+ List credentialsProviders = new ArrayList<>();
for (String providerName : providerNames) {
if (providerName.contains(ARG_DELIMITER)) {
String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER);
@@ -79,7 +81,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
try {
Class> className = Class.forName(nameAndArgs[0]);
Constructor> c = className.getConstructor(argTypes);
- credentialsProviders.add((AwsCredentialsProvider) c
+ credentialsProviders.add((AWSCredentialsProvider) c
.newInstance(Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length)));
} catch (Exception e) {
log.debug("Can't find any credentials provider matching {}.", providerName);
@@ -87,7 +89,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
} else {
try {
Class> className = Class.forName(providerName);
- credentialsProviders.add((AwsCredentialsProvider) className.newInstance());
+ credentialsProviders.add((AWSCredentialsProvider) className.newInstance());
} catch (Exception e) {
log.debug("Can't find any credentials provider matching {}.", providerName);
}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java
new file mode 100644
index 00000000..1a4d01fe
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.kinesis.multilang.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import lombok.Getter;
+import org.apache.commons.beanutils.ConvertUtilsBean;
+import org.apache.commons.beanutils.DynaBean;
+import org.apache.commons.beanutils.DynaClass;
+import org.apache.commons.beanutils.DynaProperty;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+
+public class BuilderDynaBean implements DynaBean {
+
+ private static final String[] CLASS_NAME_JOINERS = { ClassUtils.PACKAGE_SEPARATOR, ClassUtils.INNER_CLASS_SEPARATOR };
+ static final String NO_MAP_ACCESS_SUPPORT = "Map access isn't supported";
+
+ private Class> destinedClass;
+ private final ConvertUtilsBean convertUtilsBean;
+ private final List classPrefixSearchList;
+
+ private DynaBeanCreateSupport dynaBeanCreateSupport;
+ private DynaBeanBuilderSupport dynaBeanBuilderSupport;
+
+ @Getter
+ private boolean isDirty = false;
+
+ private final Function emptyPropertyHandler;
+ private Object emptyPropertyResolved = null;
+
+ public BuilderDynaBean(Class> destinedClass, ConvertUtilsBean convertUtilsBean, String... classPrefixSearchList) {
+ this(destinedClass, convertUtilsBean, null, Arrays.asList(classPrefixSearchList));
+ }
+
+ public BuilderDynaBean(Class> destinedClass, ConvertUtilsBean convertUtilsBean,
+ Function emptyPropertyHandler, String... classPrefixSearchList) {
+ this(destinedClass, convertUtilsBean, emptyPropertyHandler, Arrays.asList(classPrefixSearchList));
+ }
+
+ public BuilderDynaBean(Class> destinedClass, ConvertUtilsBean convertUtilsBean,
+ Function emtpyPropertyHandler, List classPrefixSearchList) {
+ this.convertUtilsBean = convertUtilsBean;
+ this.classPrefixSearchList = classPrefixSearchList;
+ this.emptyPropertyHandler = emtpyPropertyHandler;
+ initialize(destinedClass);
+ }
+
+ private void initialize(Class> destinedClass) {
+ this.destinedClass = destinedClass;
+
+ if (DynaBeanBuilderUtils.isBuilderOrCreate(destinedClass)) {
+ dynaBeanBuilderSupport = new DynaBeanBuilderSupport(destinedClass, convertUtilsBean, classPrefixSearchList);
+ dynaBeanCreateSupport = new DynaBeanCreateSupport(destinedClass, convertUtilsBean, classPrefixSearchList);
+ }
+ }
+
+ private void reinitializeFrom(String newClass) {
+ Class> newClazz = null;
+ List attempts = new ArrayList<>();
+ attempts.add(newClass);
+ try {
+ newClazz = Class.forName(newClass);
+ } catch (ClassNotFoundException e) {
+ //
+ // Ignored
+ //
+ }
+ if (newClazz == null) {
+ for (String prefix : classPrefixSearchList) {
+ for (String joiner : CLASS_NAME_JOINERS) {
+ String possibleClass;
+ if (prefix.endsWith(joiner)) {
+ possibleClass = prefix + newClass;
+ } else {
+ possibleClass = prefix + joiner + newClass;
+ }
+ attempts.add(possibleClass);
+ try {
+ newClazz = Class.forName(possibleClass);
+ break;
+ } catch (ClassNotFoundException e) {
+ //
+ // Ignored
+ //
+ }
+
+ }
+ }
+ }
+
+ if (newClazz == null) {
+ throw new IllegalArgumentException(
+ "Unable to load class " + newClass + ". Attempted: (" + String.join(", ", attempts) + ")");
+ }
+ initialize(newClazz);
+ }
+
+ private void validatedExpectedClass(Class> source, Class> expected) {
+ if (!ClassUtils.isAssignable(source, expected)) {
+ throw new IllegalArgumentException(
+ String.format("%s cannot be assigned to %s.", source.getName(), expected.getName()));
+ }
+ }
+
+ public boolean canBuildOrCreate() {
+ return dynaBeanBuilderSupport != null || dynaBeanCreateSupport != null;
+ }
+
+ private void validateCanBuildOrCreate() {
+ if (!canBuildOrCreate()) {
+ throw new IllegalStateException("Unable to to introspect or handle " + destinedClass.getName()
+ + " as it doesn't have a builder or create method.");
+ }
+ }
+
+ @SafeVarargs
+ public final T build(Class expected, Function