'Version 1.2.0 of the Amazon Kinesis Client Library'
This commit is contained in:
parent
50000086e3
commit
73ac2c0e25
38 changed files with 2989 additions and 28 deletions
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Amazon Kinesis Client Library for Java
|
||||
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
|
||||
Bundle-Version: 1.1.1
|
||||
Bundle-Version: 1.2.0
|
||||
Bundle-Vendor: Amazon Technologies, Inc
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
|
||||
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
|
||||
|
|
|
|||
|
|
@ -22,7 +22,13 @@ The **Amazon Kinesis Client Library for Java** enables Java developers to easily
|
|||
|
||||
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true`
|
||||
|
||||
## KCL support for other languages
|
||||
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
|
||||
|
||||
## Release Notes
|
||||
### Release 1.2 (October 21, 2014)
|
||||
* **Multi-Language Support** KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
|
||||
|
||||
### Release 1.1 (June 30, 2014)
|
||||
* **Checkpointing at a specific sequence number** — The IRecordProcessorCheckpointer interface now supports checkpointing at a sequence number specified by the record processor.
|
||||
* **Set region** — KinesisClientLibConfiguration now supports setting the region name to indicate the location of the Amazon Kinesis service. The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting.
|
||||
|
|
@ -35,4 +41,6 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
|
|||
[kinesis-guide-begin]: http://docs.aws.amazon.com/kinesis/latest/dev/before-you-begin.html
|
||||
[kinesis-guide-create]: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
|
||||
[kinesis-guide-applications]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html
|
||||
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
|
||||
[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
|
||||
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -6,7 +6,7 @@
|
|||
<artifactId>amazon-kinesis-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Amazon Kinesis Client Library for Java</name>
|
||||
<version>1.1.1</version>
|
||||
<version>1.2.0</version>
|
||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
|
||||
<url>https://aws.amazon.com/kinesis</url>
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||
|
||||
/**
|
||||
* Get AWSCredentialsProvider property.
|
||||
*/
|
||||
class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder<AWSCredentialsProvider> {
|
||||
private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class);
|
||||
private static final String AUTH_PREFIX = "com.amazonaws.auth.";
|
||||
private static final String LIST_DELIMITER = ",";
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
AWSCredentialsProviderPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get AWSCredentialsProvider property.
|
||||
*
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public AWSCredentialsProvider decodeValue(String value) {
|
||||
if (value != null) {
|
||||
List<String> providerNames = getProviderNames(value);
|
||||
List<AWSCredentialsProvider> providers = getValidCredentialsProviders(providerNames);
|
||||
AWSCredentialsProvider[] ps = new AWSCredentialsProvider[providers.size()];
|
||||
providers.toArray(ps);
|
||||
return new AWSCredentialsProviderChain(ps);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Property AWSCredentialsProvider is missing.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<AWSCredentialsProvider>> getSupportedTypes() {
|
||||
return Arrays.asList(AWSCredentialsProvider.class);
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert string list to a list of valid credentials providers.
|
||||
*/
|
||||
private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) {
|
||||
List<AWSCredentialsProvider> credentialsProviders = new ArrayList<AWSCredentialsProvider>();
|
||||
for (String providerName : providerNames) {
|
||||
try {
|
||||
Class<?> className = Class.forName(providerName);
|
||||
credentialsProviders.add((AWSCredentialsProvider) className.newInstance());
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Can't find any credentials provider matching " + providerName + ".");
|
||||
}
|
||||
}
|
||||
return credentialsProviders;
|
||||
}
|
||||
|
||||
private static List<String> getProviderNames(String property) {
|
||||
// assume list delimiter is ","
|
||||
String[] elements = property.split(LIST_DELIMITER);
|
||||
List<String> result = new ArrayList<String>();
|
||||
for (int i = 0; i < elements.length; i++) {
|
||||
String string = elements[i].trim();
|
||||
if (!string.isEmpty()) {
|
||||
// find all possible names and add them to name list
|
||||
result.addAll(getPossibleFullClassNames(string));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static List<String> getPossibleFullClassNames(String s) {
|
||||
/*
|
||||
* We take care of three cases :
|
||||
*
|
||||
* 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes
|
||||
* implementing the AWSCredentialsProvider interface:
|
||||
* http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
|
||||
*
|
||||
* 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider
|
||||
*
|
||||
* 3. Customer provides a custom credentials provider with full name of provider
|
||||
*/
|
||||
|
||||
return Arrays.asList(s, AUTH_PREFIX + s);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Provide boolean property.
|
||||
*/
|
||||
class BooleanPropertyValueDecoder implements IPropertyValueDecoder<Boolean> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
BooleanPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public Boolean decodeValue(String value) {
|
||||
return Boolean.parseBoolean(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<Boolean>> getSupportedTypes() {
|
||||
return Arrays.asList(boolean.class, Boolean.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
/**
|
||||
* Get ClientConfiguration property.
|
||||
*/
|
||||
class ClientConfigurationPropertyValueDecoder implements IPropertyValueDecoder<ClientConfiguration> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
ClientConfigurationPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public ClientConfiguration decodeValue(String value) {
|
||||
throw new UnsupportedOperationException("ClientConfiguration is currently not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get supported types.
|
||||
* @return a list of supported class
|
||||
*/
|
||||
@Override
|
||||
public List<Class<ClientConfiguration>> getSupportedTypes() {
|
||||
return Arrays.asList(ClientConfiguration.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This class captures the concept of decoding a property value to a particular Java type.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
interface IPropertyValueDecoder<T> {
|
||||
/**
|
||||
* Get the value that was read from a configuration file and convert it to some type.
|
||||
*
|
||||
* @param propertyValue property string value that needs to be decoded.
|
||||
* @return property value in type T
|
||||
*/
|
||||
T decodeValue(String propertyValue);
|
||||
|
||||
/**
|
||||
* Get a list of supported types for this class.
|
||||
*
|
||||
* @return list of supported classes.
|
||||
*/
|
||||
List<Class<T>> getSupportedTypes();
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
|
||||
|
||||
/**
|
||||
* Get an InitialiPosition enum property.
|
||||
*/
|
||||
class InitialPositionInStreamPropertyValueDecoder implements IPropertyValueDecoder<InitialPositionInStream> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
InitialPositionInStreamPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public InitialPositionInStream decodeValue(String value) {
|
||||
return InitialPositionInStream.valueOf(value.toUpperCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<InitialPositionInStream>> getSupportedTypes() {
|
||||
return Arrays.asList(InitialPositionInStream.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Get integer properties.
|
||||
*/
|
||||
class IntegerPropertyValueDecoder implements IPropertyValueDecoder<Integer> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
IntegerPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public Integer decodeValue(String value) {
|
||||
return Integer.parseInt(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<Integer>> getSupportedTypes() {
|
||||
return Arrays.asList(int.class, Integer.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
|
||||
/**
|
||||
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
|
||||
* three properties must be provided. 1) "applicationName" 2) "streamName" 3) "AWSCredentialsProvider"
|
||||
* KinesisClientLibConfigurator will help to automatically assign the value of "workerId" if this property is not
|
||||
* provided. In the specified properties file, any properties, which matches the variable name in
|
||||
* KinesisClientLibConfiguration and has a corresponding "with{variableName}" setter method, will be read in, and its
|
||||
* value in properties file will be assigned to corresponding variable in KinesisClientLibConfiguration.
|
||||
*/
|
||||
public class KinesisClientLibConfigurator {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(KinesisClientLibConfigurator.class);
|
||||
private static final String PREFIX = "with";
|
||||
|
||||
// Required properties
|
||||
private static final String PROP_APP_NAME = "applicationName";
|
||||
private static final String PROP_STREAM_NAME = "streamName";
|
||||
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider";
|
||||
private static final String PROP_WORKER_ID = "workerId";
|
||||
|
||||
private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
|
||||
private Map<String, List<Method>> nameToMethods;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
public KinesisClientLibConfigurator() {
|
||||
List<IPropertyValueDecoder<? extends Object>> getters =
|
||||
Arrays.asList(new IntegerPropertyValueDecoder(),
|
||||
new LongPropertyValueDecoder(),
|
||||
new BooleanPropertyValueDecoder(),
|
||||
new AWSCredentialsProviderPropertyValueDecoder(),
|
||||
new StringPropertyValueDecoder(),
|
||||
new InitialPositionInStreamPropertyValueDecoder(),
|
||||
new ClientConfigurationPropertyValueDecoder());
|
||||
|
||||
classToDecoder = new Hashtable<Class<?>, IPropertyValueDecoder<?>>();
|
||||
for (IPropertyValueDecoder<?> getter : getters) {
|
||||
for (Class<?> clazz : getter.getSupportedTypes()) {
|
||||
/*
|
||||
* We could validate that we never overwrite a getter but we can also do this by manual inspection of
|
||||
* the getters.
|
||||
*/
|
||||
classToDecoder.put(clazz, getter);
|
||||
}
|
||||
}
|
||||
nameToMethods = new Hashtable<String, List<Method>>();
|
||||
for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
|
||||
if (!nameToMethods.containsKey(method.getName())) {
|
||||
nameToMethods.put(method.getName(), new ArrayList<Method>());
|
||||
}
|
||||
nameToMethods.get(method.getName()).add(method);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a KinesisClientLibConfiguration with variables configured as specified by the properties in config stream.
|
||||
* Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and
|
||||
* continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not
|
||||
* match any of the variables in KinesisClientLibConfigration.
|
||||
*
|
||||
* @param properties a Properties object containing the configuration information
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration getConfiguration(Properties properties) {
|
||||
// The three minimum required arguments for constructor are obtained first. They are all mandatory, all of them
|
||||
// should be provided. If any of these three failed to be set, program will fail.
|
||||
IPropertyValueDecoder<String> stringValueDecoder = new StringPropertyValueDecoder();
|
||||
IPropertyValueDecoder<AWSCredentialsProvider> awsCPPropGetter =
|
||||
new AWSCredentialsProviderPropertyValueDecoder();
|
||||
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
|
||||
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
|
||||
AWSCredentialsProvider provider =
|
||||
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER));
|
||||
|
||||
if (applicationName == null || applicationName.isEmpty()) {
|
||||
throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
|
||||
}
|
||||
if (streamName == null || streamName.isEmpty()) {
|
||||
throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
|
||||
}
|
||||
|
||||
// Allow customer not to provide workerId or to provide empty worker id.
|
||||
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
|
||||
if (workerId == null || workerId.isEmpty()) {
|
||||
workerId = UUID.randomUUID().toString();
|
||||
LOG.info("Value of workerId is not provided in the properties. WorkerId is automatically "
|
||||
+ "assigned as: " + workerId);
|
||||
}
|
||||
|
||||
KinesisClientLibConfiguration config =
|
||||
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId);
|
||||
|
||||
Set<String> requiredNames =
|
||||
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
|
||||
PROP_APP_NAME,
|
||||
PROP_WORKER_ID,
|
||||
PROP_CREDENTIALS_PROVIDER));
|
||||
|
||||
// Set all the variables that are not used for constructor.
|
||||
for (Object keyObject : properties.keySet()) {
|
||||
String key = keyObject.toString();
|
||||
if (!requiredNames.contains(key)) {
|
||||
withProperty(key, properties, config);
|
||||
}
|
||||
}
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param configStream the input stream containing the configuration information
|
||||
* @return KinesisClientLibConfiguration
|
||||
*/
|
||||
public KinesisClientLibConfiguration getConfiguration(InputStream configStream) {
|
||||
Properties properties = new Properties();
|
||||
try {
|
||||
properties.load(configStream);
|
||||
} catch (IOException e) {
|
||||
String msg = "Could not load properties from the stream provided";
|
||||
throw new IllegalStateException(msg, e);
|
||||
} finally {
|
||||
try {
|
||||
configStream.close();
|
||||
} catch (IOException e) {
|
||||
String msg = "Encountered error while trying to close properties file.";
|
||||
throw new IllegalStateException(msg, e);
|
||||
}
|
||||
}
|
||||
return getConfiguration(properties);
|
||||
}
|
||||
|
||||
private void withProperty(String propertyKey, Properties properties, KinesisClientLibConfiguration config) {
|
||||
if (propertyKey.isEmpty()) {
|
||||
throw new IllegalArgumentException("The property can't be empty string");
|
||||
}
|
||||
// Assume that all the setters in KinesisClientLibConfiguration are in the following format
|
||||
// They all start with "with" followed by the variable name with first letter capitalized
|
||||
String targetMethodName = PREFIX + Character.toUpperCase(propertyKey.charAt(0)) + propertyKey.substring(1);
|
||||
String propertyValue = properties.getProperty(propertyKey);
|
||||
if (nameToMethods.containsKey(targetMethodName)) {
|
||||
for (Method method : nameToMethods.get(targetMethodName)) {
|
||||
if (method.getParameterTypes().length == 1 && method.getName().equals(targetMethodName)) {
|
||||
Class<?> paramType = method.getParameterTypes()[0];
|
||||
if (classToDecoder.containsKey(paramType)) {
|
||||
IPropertyValueDecoder<?> decoder = classToDecoder.get(paramType);
|
||||
try {
|
||||
method.invoke(config, decoder.decodeValue(propertyValue));
|
||||
LOG.info(String.format("Successfully set property %s with value %s",
|
||||
propertyKey,
|
||||
propertyValue));
|
||||
return;
|
||||
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
|
||||
// At this point, we really thought that we could call this method.
|
||||
LOG.warn(String.format("Encountered an error while invoking method %s with value %s. "
|
||||
+ "Exception was %s",
|
||||
method,
|
||||
propertyValue,
|
||||
e));
|
||||
} catch (UnsupportedOperationException e) {
|
||||
LOG.warn(String.format("The property %s is not supported as type %s at this time.",
|
||||
propertyKey,
|
||||
paramType));
|
||||
}
|
||||
} else {
|
||||
LOG.debug(String.format("No method for decoding parameters of type %s so method %s could not "
|
||||
+ "be invoked.",
|
||||
paramType,
|
||||
method));
|
||||
}
|
||||
} else {
|
||||
LOG.debug(String.format("Method %s doesn't look like it is appropriate for setting property %s. "
|
||||
+ "Looking for something called %s.",
|
||||
method,
|
||||
propertyKey,
|
||||
targetMethodName));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.debug(String.format("There was no appropriately named method for setting property %s.", propertyKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Get long type properties.
|
||||
*/
|
||||
class LongPropertyValueDecoder implements IPropertyValueDecoder<Long> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
LongPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public Long decodeValue(String value) {
|
||||
return Long.parseLong(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<Long>> getSupportedTypes() {
|
||||
return Arrays.asList(long.class, Long.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Get string properties from properties file.
|
||||
*/
|
||||
class StringPropertyValueDecoder implements IPropertyValueDecoder<String> {
|
||||
|
||||
/**
|
||||
* package constructor for factory use only.
|
||||
*/
|
||||
StringPropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value the property value
|
||||
* @return the value as String
|
||||
*/
|
||||
@Override
|
||||
public String decodeValue(String value) {
|
||||
// How to treat null or empty String should depend on those method who
|
||||
// uses the String value. Here just return the string as it is.
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
return value.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<String>> getSupportedTypes() {
|
||||
return Arrays.asList(String.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -68,6 +68,7 @@ class InitializeTask implements ITask {
|
|||
String initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
|
||||
dataFetcher.initialize(initialCheckpoint);
|
||||
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
|
||||
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);
|
||||
try {
|
||||
LOG.debug("Calling the record processor initialize().");
|
||||
recordProcessor.initialize(shardInfo.getShardId());
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
|
|||
/**
|
||||
* User agent set when Amazon Kinesis Client Library makes AWS requests.
|
||||
*/
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.1";
|
||||
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.2.0";
|
||||
|
||||
/**
|
||||
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||
|
|
|
|||
|
|
@ -90,8 +90,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
* If there isn't a last checkpoint value, we only care about checking the upper bound.
|
||||
* If there is a last checkpoint value, we want to check both the lower and upper bound.
|
||||
*/
|
||||
if ((lastCheckpointValue == null
|
||||
|| checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
|
||||
if ((checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
|
||||
&& checkpointValueComparator.compare(sequenceNumber, largestPermittedCheckpointValue) <= 0) {
|
||||
|
||||
this.advancePosition(sequenceNumber);
|
||||
|
|
@ -111,6 +110,11 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
|
|||
return lastCheckpointValue;
|
||||
}
|
||||
|
||||
|
||||
synchronized void setInitialCheckpointValue(String initialCheckpoint) {
|
||||
lastCheckpointValue = initialCheckpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for testing.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -28,8 +28,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
|
|
@ -145,9 +148,9 @@ public class Worker implements Runnable {
|
|||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient) {
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient,
|
||||
Executors.newCachedThreadPool());
|
||||
}
|
||||
|
|
@ -163,9 +166,9 @@ public class Worker implements Runnable {
|
|||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
AmazonCloudWatch cloudWatchClient,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, new CWMetricsFactory(cloudWatchClient,
|
||||
config.getApplicationName(),
|
||||
|
|
@ -189,8 +192,8 @@ public class Worker implements Runnable {
|
|||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonKinesis kinesisClient,
|
||||
AmazonDynamoDB dynamoDBClient,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(
|
||||
|
|
@ -564,4 +567,79 @@ public class Worker implements Runnable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Backwards compatible constructors
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
(AmazonCloudWatch) cloudWatchClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param cloudWatchClient CloudWatch Client for publishing metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
AmazonCloudWatchClient cloudWatchClient,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
(AmazonCloudWatch) cloudWatchClient,
|
||||
execService);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor is for binary compatibility with code compiled against
|
||||
* version of the KCL that only have constructors taking "Client" objects.
|
||||
*
|
||||
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
|
||||
* @param config Kinesis Client Library configuration
|
||||
* @param kinesisClient Kinesis Client used for fetching data
|
||||
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
|
||||
* @param metricsFactory Metrics factory used to emit metrics
|
||||
* @param execService ExecutorService to use for processing records (support for multi-threaded
|
||||
* consumption)
|
||||
*/
|
||||
public Worker(IRecordProcessorFactory recordProcessorFactory,
|
||||
KinesisClientLibConfiguration config,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonDynamoDBClient dynamoDBClient,
|
||||
IMetricsFactory metricsFactory,
|
||||
ExecutorService execService) {
|
||||
this(recordProcessorFactory,
|
||||
config,
|
||||
(AmazonKinesis) kinesisClient,
|
||||
(AmazonDynamoDB) dynamoDBClient,
|
||||
metricsFactory,
|
||||
execService);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||
|
|
@ -50,7 +51,7 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
private static String defaultServiceName = "kinesis";
|
||||
private static String defaultRegionId = "us-east-1";;
|
||||
|
||||
private AmazonKinesisClient client;
|
||||
private AmazonKinesis client;
|
||||
private AWSCredentialsProvider credentialsProvider;
|
||||
|
||||
private final String streamName;
|
||||
|
|
@ -91,12 +92,23 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
String regionId,
|
||||
long describeStreamBackoffTimeInMillis,
|
||||
int maxDescribeStreamRetryAttempts) {
|
||||
this(streamName, credentialProvider, new AmazonKinesisClient(credentialProvider),
|
||||
describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
|
||||
client.setEndpoint(endpoint, serviceName, regionId);
|
||||
this(streamName, credentialProvider, buildClientSettingEndpoint(credentialProvider,
|
||||
endpoint,
|
||||
serviceName,
|
||||
regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
|
||||
|
||||
|
||||
LOG.debug("KinesisProxy has created a kinesisClient");
|
||||
}
|
||||
|
||||
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
|
||||
String endpoint,
|
||||
String serviceName,
|
||||
String regionId) {
|
||||
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
|
||||
client.setEndpoint(endpoint, serviceName, regionId);
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Public constructor.
|
||||
|
|
@ -109,7 +121,7 @@ public class KinesisProxy implements IKinesisProxy {
|
|||
*/
|
||||
public KinesisProxy(final String streamName,
|
||||
AWSCredentialsProvider credentialProvider,
|
||||
AmazonKinesisClient kinesisClient,
|
||||
AmazonKinesis kinesisClient,
|
||||
long describeStreamBackoffTimeInMillis,
|
||||
int maxDescribeStreamRetryAttempts) {
|
||||
this.streamName = streamName;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
|
|||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
|
||||
/**
|
||||
|
|
@ -28,7 +29,7 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
private static String defaultRegionId = "us-east-1";
|
||||
private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
|
||||
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
|
||||
private final AmazonKinesisClient kinesisClient;
|
||||
private final AmazonKinesis kinesisClient;
|
||||
private final long describeStreamBackoffTimeInMillis;
|
||||
private final int maxDescribeStreamRetryAttempts;
|
||||
|
||||
|
|
@ -55,7 +56,6 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
String endpoint) {
|
||||
this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId,
|
||||
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
|
||||
this.kinesisClient.setConfiguration(clientConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -64,7 +64,7 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
* @param credentialProvider credentials provider used to sign requests
|
||||
* @param client AmazonKinesisClient used to fetch data from Kinesis
|
||||
*/
|
||||
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesisClient client) {
|
||||
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) {
|
||||
this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
|
||||
}
|
||||
|
||||
|
|
@ -86,9 +86,13 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
String regionId,
|
||||
long describeStreamBackoffTimeInMillis,
|
||||
int maxDescribeStreamRetryAttempts) {
|
||||
this(credentialProvider, new AmazonKinesisClient(credentialProvider, clientConfig),
|
||||
this(credentialProvider, buildClientSettingEndpoint(credentialProvider,
|
||||
clientConfig,
|
||||
endpoint,
|
||||
serviceName,
|
||||
regionId),
|
||||
describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
|
||||
this.kinesisClient.setEndpoint(endpoint, serviceName, regionId);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -100,7 +104,7 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
|
||||
*/
|
||||
KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
|
||||
AmazonKinesisClient client,
|
||||
AmazonKinesis client,
|
||||
long describeStreamBackoffTimeInMillis,
|
||||
int maxDescribeStreamRetryAttempts) {
|
||||
super();
|
||||
|
|
@ -122,4 +126,14 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
|
|||
maxDescribeStreamRetryAttempts);
|
||||
|
||||
}
|
||||
|
||||
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
|
||||
ClientConfiguration clientConfig,
|
||||
String endpoint,
|
||||
String serviceName,
|
||||
String regionId) {
|
||||
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig);
|
||||
client.setEndpoint(endpoint, serviceName, regionId);
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
|
||||
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
|
||||
|
|
@ -74,7 +75,7 @@ public class CWMetricsFactory implements IMetricsFactory {
|
|||
* @param bufferTimeMillis time to buffer metrics before publishing to CloudWatch
|
||||
* @param maxQueueSize maximum number of metrics that we can have in a queue
|
||||
*/
|
||||
public CWMetricsFactory(AmazonCloudWatchClient cloudWatchClient,
|
||||
public CWMetricsFactory(AmazonCloudWatch cloudWatchClient,
|
||||
String namespace,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize) {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.metrics.impl;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -41,6 +42,10 @@ public class CWPublisherRunnable<KeyType> implements Runnable {
|
|||
private boolean shuttingDown = false;
|
||||
private boolean shutdown = false;
|
||||
private long lastFlushTime = Long.MAX_VALUE;
|
||||
private int maxJitter;
|
||||
|
||||
private Random rand = new Random();
|
||||
private int nextJitterValueToUse = 0;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -55,17 +60,27 @@ public class CWPublisherRunnable<KeyType> implements Runnable {
|
|||
long bufferTimeMillis,
|
||||
int maxQueueSize,
|
||||
int batchSize) {
|
||||
this(metricsPublisher, bufferTimeMillis, maxQueueSize, batchSize, 0);
|
||||
}
|
||||
|
||||
public CWPublisherRunnable(ICWMetricsPublisher<KeyType> metricsPublisher,
|
||||
long bufferTimeMillis,
|
||||
int maxQueueSize,
|
||||
int batchSize,
|
||||
int maxJitter) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("Constructing CWPublisherRunnable with maxBufferTimeMillis %d maxQueueSize %d batchSize %d",
|
||||
LOG.debug(String.format("Constructing CWPublisherRunnable with maxBufferTimeMillis %d maxQueueSize %d batchSize %d maxJitter %d",
|
||||
bufferTimeMillis,
|
||||
maxQueueSize,
|
||||
batchSize));
|
||||
batchSize,
|
||||
maxJitter));
|
||||
}
|
||||
|
||||
this.metricsPublisher = metricsPublisher;
|
||||
this.bufferTimeMillis = bufferTimeMillis;
|
||||
this.queue = new MetricAccumulatingQueue<KeyType>(maxQueueSize);
|
||||
this.flushSize = batchSize;
|
||||
this.maxJitter = maxJitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -130,7 +145,12 @@ public class CWPublisherRunnable<KeyType> implements Runnable {
|
|||
} catch (Throwable t) {
|
||||
LOG.error("Caught exception thrown by metrics Publisher in CWPublisherRunnable", t);
|
||||
}
|
||||
lastFlushTime = getTime();
|
||||
// Changing the value of lastFlushTime will change the time when metrics are flushed next.
|
||||
lastFlushTime = getTime() + nextJitterValueToUse;
|
||||
if (maxJitter != 0) {
|
||||
// nextJittervalueToUse will be a value between (-maxJitter,+maxJitter)
|
||||
nextJitterValueToUse = maxJitter - rand.nextInt(2 * maxJitter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Reads lines off the STDERR of the child process and prints them to this process's (the JVM's) STDERR and log.
|
||||
*/
|
||||
class DrainChildSTDERRTask extends LineReaderTask<Boolean> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DrainChildSTDERRTask.class);
|
||||
|
||||
DrainChildSTDERRTask() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HandleLineResult<Boolean> handleLine(String line) {
|
||||
LOG.error("Received error line from subprocess [" + line + "] for shard " + getShardId());
|
||||
System.err.println(line);
|
||||
return new HandleLineResult<Boolean>();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean returnAfterException(Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean returnAfterEndOfInput() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public LineReaderTask<Boolean> initialize(BufferedReader reader, String shardId) {
|
||||
return initialize(reader, shardId, "Draining STDERR for " + shardId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* This class is used to drain the STDOUT of the child process. After the child process has been given a shutdown
|
||||
* message and responded indicating that it is shutdown, we attempt to close the input and outputs of that process so
|
||||
* that the process can exit.
|
||||
*
|
||||
* To understand why this is necessary, consider the following scenario:
|
||||
*
|
||||
* <ol>
|
||||
* <li>Child process responds that it is done with shutdown.</li>
|
||||
* <li>Child process prints debugging text to STDOUT that fills the pipe buffer so child becomes blocked.</li>
|
||||
* <li>Parent process doesn't drain child process's STDOUT.</li>
|
||||
* <li>Child process remains blocked.</li>
|
||||
* </ol>
|
||||
*
|
||||
* To prevent the child process from becoming blocked in this way, it is the responsibility of the parent process to
|
||||
* drain the child process's STDOUT. We reprint each drained line to our log to permit debugging.
|
||||
*/
|
||||
class DrainChildSTDOUTTask extends LineReaderTask<Boolean> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DrainChildSTDOUTTask.class);
|
||||
|
||||
DrainChildSTDOUTTask() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HandleLineResult<Boolean> handleLine(String line) {
|
||||
LOG.info("Drained line for shard " + getShardId() + ": " + line);
|
||||
return new HandleLineResult<Boolean>();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean returnAfterException(Exception e) {
|
||||
LOG.info("Encountered exception while draining STDOUT of child process for shard " + getShardId(), e);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean returnAfterEndOfInput() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public LineReaderTask<Boolean> initialize(BufferedReader reader, String shardId) {
|
||||
return initialize(reader, shardId, "Draining STDOUT for " + shardId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Gets the next message off the STDOUT of the child process. Throws an exception if a message is not found before the
|
||||
* end of the input stream is reached.
|
||||
*/
|
||||
class GetNextMessageTask extends LineReaderTask<Message> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(GetNextMessageTask.class);
|
||||
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private static final String EMPTY_LINE = "";
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param objectMapper An object mapper for decoding json messages from the input stream.
|
||||
*/
|
||||
GetNextMessageTask(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a line is an empty line.
|
||||
*
|
||||
* @param line A string
|
||||
* @return True if the line is an empty string, i.e. "", false otherwise.
|
||||
*/
|
||||
static boolean isEmptyLine(String line) {
|
||||
return EMPTY_LINE.equals(line);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HandleLineResult<Message> handleLine(String line) {
|
||||
try {
|
||||
/*
|
||||
* If the line is an empty line we don't bother logging anything because we expect the child process to be
|
||||
* nesting its messages between new lines, e.g. "\n<JSON message>\n". If there are no other entities writing
|
||||
* to the child process's STDOUT then this behavior will result in patterns like
|
||||
* "...\n<JSON message>\n\n<JSON message>\n..." which contains empty lines.
|
||||
*/
|
||||
if (!isEmptyLine(line)) {
|
||||
return new HandleLineResult<Message>(objectMapper.readValue(line, Message.class));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Skipping unexpected line on STDOUT for shard " + getShardId() + ": " + line);
|
||||
}
|
||||
return new HandleLineResult<Message>();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message returnAfterException(Exception e) {
|
||||
throw new RuntimeException("Encountered an error while reading a line from STDIN for shard " + getShardId()
|
||||
+ " so won't be able to return a message.", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message returnAfterEndOfInput() {
|
||||
throw new RuntimeException("Reached end of STDIN of child process for shard " + getShardId()
|
||||
+ " so won't be able to return a message.");
|
||||
}
|
||||
|
||||
public LineReaderTask<Message> initialize(BufferedReader reader, String shardId) {
|
||||
return initialize(reader, shardId, "Reading next message from STDIN for " + shardId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,187 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* This abstract class captures the process of reading from an input stream. Three methods must be provided for
|
||||
* implementations to work.
|
||||
* <ol>
|
||||
* <li> {@link #handleLine(String)}</li>
|
||||
* <li> {@link #returnAfterEndOfInput()}</li>
|
||||
* <li> {@link #returnAfterException(Exception)}</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
abstract class LineReaderTask<T> implements Callable<T> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(LineReaderTask.class);
|
||||
|
||||
private BufferedReader reader;
|
||||
|
||||
private String description;
|
||||
|
||||
private String shardId;
|
||||
|
||||
LineReaderTask() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads lines off the input stream until a return value is set, or an exception is encountered, or the end of the
|
||||
* input stream is reached. Will call the appropriate methods in each case. This is the shared piece of logic
|
||||
* between any tasks that need to read from a child process's STDOUT or STDERR.
|
||||
*/
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
String nextLine = null;
|
||||
try {
|
||||
LOG.info("Starting: " + description);
|
||||
while ((nextLine = reader.readLine()) != null) {
|
||||
HandleLineResult<T> result = handleLine(nextLine);
|
||||
if (result.hasReturnValue()) {
|
||||
return result.returnValue();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return returnAfterException(e);
|
||||
}
|
||||
LOG.info("Stopping: " + description);
|
||||
return returnAfterEndOfInput();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a line read from the input stream. The return value indicates whether the enclosing read-loop should
|
||||
* return from the {@link #call()} function by having a value, indicating that value should be returned immediately
|
||||
* without reading further, or not having a value, indicating that more lines of input need to be read before
|
||||
* returning.
|
||||
*
|
||||
* @param line A line read from the input stream.
|
||||
* @return HandleLineResult<T> which may or may not have a has return value, indicating to return or not return yet
|
||||
* respectively.
|
||||
*/
|
||||
protected abstract HandleLineResult<T> handleLine(String line);
|
||||
|
||||
/**
|
||||
* This method will be called if there is an error while reading from the input stream. The return value of this
|
||||
* method will be returned as the result of this Callable unless an Exception is thrown. If an Exception is thrown
|
||||
* then that exception will be thrown by the Callable.
|
||||
*
|
||||
* @param e An exception that occurred while reading from the input stream.
|
||||
* @return What to return.
|
||||
*/
|
||||
protected abstract T returnAfterException(Exception e) throws Exception;
|
||||
|
||||
/**
|
||||
* This method will be called once the end of the input stream is reached. The return value of this method will be
|
||||
* returned as the result of this Callable. Implementations of this method are welcome to throw a runtime exception
|
||||
* to indicate that the task was unsuccessful.
|
||||
*
|
||||
* @return What to return.
|
||||
*/
|
||||
protected abstract T returnAfterEndOfInput();
|
||||
|
||||
/**
|
||||
* Allows subclasses to provide more detailed logs. Specifically, this allows the drain tasks and GetNextMessageTask
|
||||
* to log which shard they're working on.
|
||||
*
|
||||
* @return The shard id
|
||||
*/
|
||||
public String getShardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
||||
/**
|
||||
* The description should be a string explaining what this particular LineReader class does.
|
||||
*
|
||||
* @return The description.
|
||||
*/
|
||||
public String getDescription() {
|
||||
return this.description;
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of a call to {@link LineReaderTask#handleLine(String)}. Allows implementations of that method to
|
||||
* indicate whether a particular invocation of that method produced a return for this task or not. If a return value
|
||||
* doesn't exist the {@link #call()} method will continue to the next line.
|
||||
*
|
||||
* @param <V>
|
||||
*/
|
||||
protected class HandleLineResult<V> {
|
||||
|
||||
private boolean hasReturnValue;
|
||||
private V returnValue;
|
||||
|
||||
HandleLineResult() {
|
||||
this.hasReturnValue = false;
|
||||
}
|
||||
|
||||
HandleLineResult(V returnValue) {
|
||||
this.hasReturnValue = true;
|
||||
this.returnValue = returnValue;
|
||||
}
|
||||
|
||||
boolean hasReturnValue() {
|
||||
return this.hasReturnValue;
|
||||
}
|
||||
|
||||
V returnValue() {
|
||||
if (hasReturnValue()) {
|
||||
return this.returnValue;
|
||||
} else {
|
||||
throw new RuntimeException("There was no value to return.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An initialization method allows us to delay setting the attributes of this class. Some of the attributes, stream
|
||||
* and shardId, are not known to the {@link MultiLangRecordProcessorFactory} when it constructs a
|
||||
* {@link MultiLangRecordProcessor} but are later determined when
|
||||
* {@link MultiLangRecordProcessor#initialize(String)} is called. So we follow a pattern where the attributes are
|
||||
* set inside this method instead of the constructor so that this object will be initialized when all its attributes
|
||||
* are known to the record processor.
|
||||
*
|
||||
* @param stream
|
||||
* @param shardId
|
||||
* @param description
|
||||
* @return
|
||||
*/
|
||||
protected LineReaderTask<T> initialize(InputStream stream, String shardId, String description) {
|
||||
return this.initialize(new BufferedReader(new InputStreamReader(stream)), shardId, description);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reader
|
||||
* @param shardId
|
||||
* @param description
|
||||
* @return
|
||||
*/
|
||||
protected LineReaderTask<T> initialize(BufferedReader reader, String shardId, String description) {
|
||||
this.reader = reader;
|
||||
this.shardId = shardId;
|
||||
this.description = description;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Provides methods for interacting with the child process's STDOUT.
|
||||
*
|
||||
* {@link #getNextMessageFromSTDOUT()} reads lines from the child process's STDOUT and attempts to decode a
|
||||
* {@link Message} object from each line. A child process's STDOUT could have lines that don't contain data related to
|
||||
* the multi-language protocol, such as when the child process prints debugging information to its STDOUT (instead of
|
||||
* logging to a file), also when a child processes writes a Message it is expected to prepend and append a new line
|
||||
* character to their message to help ensure that it is isolated on a line all by itself which results in empty lines
|
||||
* being present in STDOUT. Lines which cannot be decoded to a Message object are ignored.
|
||||
*
|
||||
* {@link #drainSTDOUT()} simply reads all data from the child process's STDOUT until the stream is closed.
|
||||
*/
|
||||
class MessageReader {
|
||||
|
||||
private BufferedReader reader;
|
||||
|
||||
private String shardId;
|
||||
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* Use the initialize methods after construction.
|
||||
*/
|
||||
MessageReader() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a future which represents an attempt to read the next message in the child process's STDOUT. If the task
|
||||
* is successful, the result of the future will be the next message found in the child process's STDOUT, if the task
|
||||
* is unable to find a message before the child process's STDOUT is closed, or reading from STDOUT causes an
|
||||
* IOException, then an execution exception will be generated by this future.
|
||||
*
|
||||
* The task employed by this method reads from the child process's STDOUT line by line. The task attempts to decode
|
||||
* each line into a {@link Message} object. Lines that fail to decode to a Message are ignored and the task
|
||||
* continues to the next line until it finds a Message.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
Future<Message> getNextMessageFromSTDOUT() {
|
||||
GetNextMessageTask getNextMessageTask = new GetNextMessageTask(objectMapper);
|
||||
getNextMessageTask.initialize(reader, shardId);
|
||||
return executorService.submit(getNextMessageTask);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a future that represents a computation that drains the STDOUT of the child process. That future's result
|
||||
* is true if the end of the child's STDOUT is reached, its result is false if there was an error while reading from
|
||||
* the stream. This task will log all the lines it drains to permit debugging.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
Future<Boolean> drainSTDOUT() {
|
||||
DrainChildSTDOUTTask drainTask = new DrainChildSTDOUTTask();
|
||||
drainTask.initialize(reader, shardId);
|
||||
return this.executorService.submit(drainTask);
|
||||
}
|
||||
|
||||
/**
|
||||
* An initialization method allows us to delay setting the attributes of this class. Some of the attributes,
|
||||
* stream and shardId, are not known to the {@link MultiLangRecordProcessorFactory} when it constructs a
|
||||
* {@link MultiLangRecordProcessor} but are later determined when
|
||||
* {@link MultiLangRecordProcessor#initialize(String)} is called. So we follow a pattern where the attributes are
|
||||
* set inside this method instead of the constructor so that this object will be initialized when all its attributes
|
||||
* are known to the record processor.
|
||||
*
|
||||
* @param stream Used to read messages from the subprocess.
|
||||
* @param shardId The shard we're working on.
|
||||
* @param objectMapper The object mapper to decode messages.
|
||||
* @param executorService An executor service to run tasks in.
|
||||
*/
|
||||
MessageReader initialize(InputStream stream,
|
||||
String shardId,
|
||||
ObjectMapper objectMapper,
|
||||
ExecutorService executorService) {
|
||||
return this.initialize(new BufferedReader(new InputStreamReader(stream)), shardId, objectMapper,
|
||||
executorService);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reader Used to read messages from the subprocess.
|
||||
* @param shardId The shard we're working on.
|
||||
* @param objectMapper The object mapper to decode messages.
|
||||
* @param executorService An executor service to run tasks in.
|
||||
*/
|
||||
MessageReader initialize(BufferedReader reader,
|
||||
String shardId,
|
||||
ObjectMapper objectMapper,
|
||||
ExecutorService executorService) {
|
||||
this.reader = reader;
|
||||
this.shardId = shardId;
|
||||
this.objectMapper = objectMapper;
|
||||
this.executorService = executorService;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Defines methods for writing {@link Message} objects to the child process's STDIN.
|
||||
*/
|
||||
class MessageWriter {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MessageWriter.class);
|
||||
|
||||
private BufferedWriter writer;
|
||||
|
||||
private volatile boolean open = true;
|
||||
|
||||
private String shardId;
|
||||
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* Use initialize method after construction.
|
||||
*/
|
||||
MessageWriter() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the message then writes the line separator provided by the system. Flushes each message to guarantee it
|
||||
* is delivered as soon as possible to the subprocess.
|
||||
*
|
||||
* @param message A message to be written to the subprocess.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private Future<Boolean> writeMessageToOutput(final String message) throws IOException {
|
||||
Callable<Boolean> writeMessageToOutputTask = new Callable<Boolean>() {
|
||||
public Boolean call() throws Exception {
|
||||
try {
|
||||
/*
|
||||
* If the message size exceeds the size of the buffer, the write won't be guaranteed to be atomic,
|
||||
* so we synchronize on the writer to avoid interlaced lines from different calls to this method.
|
||||
*/
|
||||
synchronized (writer) {
|
||||
writer.write(message, 0, message.length());
|
||||
writer.write(System.lineSeparator(), 0, System.lineSeparator().length());
|
||||
writer.flush();
|
||||
}
|
||||
LOG.info("Message size == " + message.getBytes().length + " bytes for shard " + shardId);
|
||||
} catch (IOException e) {
|
||||
open = false;
|
||||
}
|
||||
return open;
|
||||
}
|
||||
};
|
||||
|
||||
if (open) {
|
||||
return this.executorService.submit(writeMessageToOutputTask);
|
||||
} else {
|
||||
String errorMessage = "Cannot write message " + message + " because writer is closed for shard " + shardId;
|
||||
LOG.info(errorMessage);
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the message to a JSON string and writes it to the subprocess.
|
||||
*
|
||||
* @param message A message to be written to the subprocess.
|
||||
* @return
|
||||
*/
|
||||
private Future<Boolean> writeMessage(Message message) {
|
||||
LOG.info("Writing " + message.getClass().getSimpleName() + " to child process for shard " + shardId);
|
||||
try {
|
||||
String jsonText = objectMapper.writeValueAsString(message);
|
||||
return writeMessageToOutput(jsonText);
|
||||
} catch (IOException e) {
|
||||
String errorMessage =
|
||||
String.format("Encountered I/O error while writing %s action to subprocess", message.getClass()
|
||||
.getSimpleName());
|
||||
LOG.error(errorMessage, e);
|
||||
throw new RuntimeException(errorMessage, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes an {@link InitializeMessage} to the subprocess.
|
||||
*
|
||||
* @param shardIdToWrite The shard id.
|
||||
*/
|
||||
Future<Boolean> writeInitializeMessage(String shardIdToWrite) {
|
||||
return writeMessage(new InitializeMessage(shardIdToWrite));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link ProcessRecordsMessage} message to the subprocess.
|
||||
*
|
||||
* @param records The records to be processed.
|
||||
*/
|
||||
Future<Boolean> writeProcessRecordsMessage(List<Record> records) {
|
||||
return writeMessage(new ProcessRecordsMessage(records));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link ShutdownMessage} to the subprocess.
|
||||
*
|
||||
* @param reason The reason for shutting down.
|
||||
*/
|
||||
Future<Boolean> writeShutdownMessage(ShutdownReason reason) {
|
||||
return writeMessage(new ShutdownMessage(reason));
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link CheckpointMessage} to the subprocess.
|
||||
*
|
||||
* @param sequenceNumber The sequence number that was checkpointed.
|
||||
* @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
|
||||
*/
|
||||
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) {
|
||||
return writeMessage(new CheckpointMessage(sequenceNumber, throwable));
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the output stream and prevents further attempts to write.
|
||||
*
|
||||
* @throws IOException Thrown when closing the writer fails
|
||||
*/
|
||||
void close() throws IOException {
|
||||
open = false;
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
boolean isOpen() {
|
||||
return this.open;
|
||||
}
|
||||
|
||||
/**
|
||||
* An initialization method allows us to delay setting the attributes of this class. Some of the attributes,
|
||||
* stream and shardId, are not known to the {@link MultiLangRecordProcessorFactory} when it constructs a
|
||||
* {@link MultiLangRecordProcessor} but are later determined when
|
||||
* {@link MultiLangRecordProcessor#initialize(String)} is called. So we follow a pattern where the attributes are
|
||||
* set inside this method instead of the constructor so that this object will be initialized when all its attributes
|
||||
* are known to the record processor.
|
||||
*
|
||||
* @param stream Used to write messages to the subprocess.
|
||||
* @param shardId The shard we're working on.
|
||||
* @param objectMapper The object mapper to encode messages.
|
||||
* @param executorService An executor service to run tasks in.
|
||||
*/
|
||||
MessageWriter initialize(OutputStream stream,
|
||||
String shardId,
|
||||
ObjectMapper objectMapper,
|
||||
ExecutorService executorService) {
|
||||
return this.initialize(new BufferedWriter(new OutputStreamWriter(stream)), shardId, objectMapper,
|
||||
executorService);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param writer Used to write messages to the subprocess.
|
||||
* @param shardId The shard we're working on.
|
||||
* @param objectMapper The object mapper to encode messages.
|
||||
* @param executorService An executor service to run tasks in.
|
||||
*/
|
||||
MessageWriter initialize(BufferedWriter writer,
|
||||
String shardId,
|
||||
ObjectMapper objectMapper,
|
||||
ExecutorService executorService) {
|
||||
this.writer = writer;
|
||||
this.shardId = shardId;
|
||||
this.objectMapper = objectMapper;
|
||||
this.executorService = executorService;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
|
||||
|
||||
/**
|
||||
* Main app that launches the worker that runs the multi-language record processor.
|
||||
*
|
||||
* Requires a properties file containing configuration for this daemon and the KCL. A properties file should at minimum
|
||||
* define these properties:
|
||||
*
|
||||
* <pre>
|
||||
* # The script that abides by the multi-language protocol. This script will
|
||||
* # be executed by the MultiLangDaemon, which will communicate with this script
|
||||
* # over STDIN and STDOUT according to the multi-language protocol.
|
||||
* executableName = sampleapp.py
|
||||
*
|
||||
* # The name of an Amazon Kinesis stream to process.
|
||||
* streamName = words
|
||||
*
|
||||
* # Used by the KCL as the name of this application. Will be used as the name
|
||||
* # of a Amazon DynamoDB table which will store the lease and checkpoint
|
||||
* # information for workers with this application name.
|
||||
* applicationName = PythonKCLSample
|
||||
*
|
||||
* # Users can change the credentials provider the KCL will use to retrieve credentials.
|
||||
* # The DefaultAWSCredentialsProviderChain checks several other providers, which is
|
||||
* # described here:
|
||||
* # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
|
||||
* AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
|
||||
* </pre>
|
||||
*/
|
||||
public class MultiLangDaemon implements Callable<Integer> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangDaemon.class);
|
||||
|
||||
private static final String USER_AGENT = "amazon-kinesis-multi-lang-daemon";
|
||||
private static final String VERSION = "1.0.0";
|
||||
|
||||
private static final String PROP_EXECUTABLE_NAME = "executableName";
|
||||
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
|
||||
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
|
||||
|
||||
private KinesisClientLibConfiguration configuration;
|
||||
|
||||
private MultiLangRecordProcessorFactory recordProcessorFactory;
|
||||
|
||||
private ExecutorService workerThreadPool;
|
||||
|
||||
private String processingLanguage;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
MultiLangDaemon(String processingLanguage,
|
||||
KinesisClientLibConfiguration configuration,
|
||||
MultiLangRecordProcessorFactory recordProcessorFactory,
|
||||
ExecutorService workerThreadPool) {
|
||||
this.processingLanguage = processingLanguage;
|
||||
this.configuration = configuration;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.workerThreadPool = workerThreadPool;
|
||||
}
|
||||
|
||||
static void printUsage(PrintStream stream, String message) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
if (message != null) {
|
||||
builder.append(message);
|
||||
}
|
||||
builder.append(String.format("java %s <properties file>", MultiLangDaemon.class.getCanonicalName()));
|
||||
stream.println(builder.toString());
|
||||
}
|
||||
|
||||
static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
|
||||
Properties properties = new Properties();
|
||||
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
|
||||
properties.load(propertiesStream);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
static boolean validateProperties(Properties properties) {
|
||||
return properties != null && properties.getProperty(PROP_EXECUTABLE_NAME) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will cause the MultiLangDaemon to read its configuration and build a worker with a
|
||||
* MultiLangRecordProcessorFactory for the executable specified in the provided properties.
|
||||
*/
|
||||
void prepare() {
|
||||
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
|
||||
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
|
||||
|
||||
LOG.info("Using workerId: " + configuration.getWorkerIdentifier());
|
||||
LOG.info("Using credentials with access key id: "
|
||||
+ configuration.getKinesisCredentialsProvider().getCredentials().getAWSAccessKeyId());
|
||||
|
||||
StringBuilder userAgent = new StringBuilder(KinesisClientLibConfiguration.KINESIS_CLIENT_LIB_USER_AGENT);
|
||||
userAgent.append(" ");
|
||||
userAgent.append(USER_AGENT);
|
||||
userAgent.append("/");
|
||||
userAgent.append(VERSION);
|
||||
|
||||
if (processingLanguage != null) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(processingLanguage);
|
||||
}
|
||||
|
||||
if (recordProcessorFactory.getCommandArray().length > 0) {
|
||||
userAgent.append(" ");
|
||||
userAgent.append(recordProcessorFactory.getCommandArray()[0]);
|
||||
}
|
||||
|
||||
LOG.debug(String.format("User Agent string is: %s", userAgent.toString()));
|
||||
configuration.withUserAgent(userAgent.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
prepare();
|
||||
Worker worker = new Worker(recordProcessorFactory, configuration, workerThreadPool);
|
||||
int exitCode = 0;
|
||||
try {
|
||||
worker.run();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Caught throwable while processing data.", t);
|
||||
exitCode = 1;
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
private static int getMaxActiveThreads(Properties properties) {
|
||||
return Integer.parseInt(properties.getProperty(PROP_MAX_ACTIVE_THREADS, "0"));
|
||||
}
|
||||
|
||||
private static ExecutorService getExecutorService(Properties properties) {
|
||||
int maxActiveThreads = getMaxActiveThreads(properties);
|
||||
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
|
||||
if (maxActiveThreads <= 0) {
|
||||
LOG.info("Using a cached thread pool.");
|
||||
return Executors.newCachedThreadPool();
|
||||
} else {
|
||||
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
|
||||
return Executors.newFixedThreadPool(maxActiveThreads);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
|
||||
* well as the name of an executable.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
|
||||
if (args.length == 0) {
|
||||
printUsage(System.err, "You must provide a properties file");
|
||||
System.exit(1);
|
||||
}
|
||||
Properties properties = null;
|
||||
try {
|
||||
properties = loadProperties(Thread.currentThread().getContextClassLoader(), args[0]);
|
||||
} catch (IOException e) {
|
||||
printUsage(System.err, "You must provide a properties file");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if (validateProperties(properties)) {
|
||||
|
||||
// Configuration
|
||||
KinesisClientLibConfiguration kinesisClientLibConfiguration =
|
||||
new KinesisClientLibConfigurator().getConfiguration(properties);
|
||||
String executableName = properties.getProperty(PROP_EXECUTABLE_NAME);
|
||||
|
||||
ExecutorService executorService = getExecutorService(properties);
|
||||
|
||||
// Factory
|
||||
MultiLangRecordProcessorFactory recordProcessorFactory =
|
||||
new MultiLangRecordProcessorFactory(executableName, executorService);
|
||||
|
||||
// Daemon
|
||||
MultiLangDaemon daemon =
|
||||
new MultiLangDaemon(properties.getProperty(PROP_PROCESSING_LANGUAGE),
|
||||
kinesisClientLibConfiguration, recordProcessorFactory, executorService);
|
||||
|
||||
LOG.info("Running " + kinesisClientLibConfiguration.getApplicationName() + " to process stream "
|
||||
+ kinesisClientLibConfiguration.getStreamName() + " with executable " + executableName);
|
||||
|
||||
Future<Integer> future = executorService.submit(daemon);
|
||||
try {
|
||||
System.exit(future.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Encountered an error while running daemon", e);
|
||||
}
|
||||
} else {
|
||||
printUsage(System.err, "Must provide an executable name in the properties file, "
|
||||
+ "e.g. executableName = sampleapp.py");
|
||||
}
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.Message;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
|
||||
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
|
||||
|
||||
/**
|
||||
* An implementation of the multi language protocol.
|
||||
*/
|
||||
class MultiLangProtocol {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangProtocol.class);
|
||||
|
||||
private MessageReader messageReader;
|
||||
private MessageWriter messageWriter;
|
||||
private String shardId;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param messageReader A message reader.
|
||||
* @param messageWriter A message writer.
|
||||
* @param shardId The shard id this processor is associated with.
|
||||
*/
|
||||
MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, String shardId) {
|
||||
this.messageReader = messageReader;
|
||||
this.messageWriter = messageWriter;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes an {@link InitializeMessage} to the child process's STDIN and waits for the child process to respond with
|
||||
* a {@link StatusMessage} on its STDOUT.
|
||||
*
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
boolean initialize() {
|
||||
/*
|
||||
* Call and response to child process.
|
||||
*/
|
||||
Future<Boolean> writeFuture = messageWriter.writeInitializeMessage(shardId);
|
||||
return waitForStatusMessage(InitializeMessage.ACTION, null, writeFuture);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link ProcessRecordsMessage} to the child process's STDIN and waits for the child process to respond
|
||||
* with a {@link StatusMessage} on its STDOUT.
|
||||
*
|
||||
* @param records The records to process.
|
||||
* @param checkpointer A checkpointer.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
boolean processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
|
||||
Future<Boolean> writeFuture = messageWriter.writeProcessRecordsMessage(records);
|
||||
return waitForStatusMessage(ProcessRecordsMessage.ACTION, checkpointer, writeFuture);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link ShutdownMessage} to the child process's STDIN and waits for the child process to respond with a
|
||||
* {@link StatusMessage} on its STDOUT.
|
||||
*
|
||||
* @param checkpointer A checkpointer.
|
||||
* @param reason Why this processor is being shutdown.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
boolean shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
|
||||
Future<Boolean> writeFuture = messageWriter.writeShutdownMessage(reason);
|
||||
return waitForStatusMessage(ShutdownMessage.ACTION, checkpointer, writeFuture);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a {@link StatusMessage} for a particular action. If a {@link CheckpointMessage} is received, then this
|
||||
* method will attempt to checkpoint with the provided {@link IRecordProcessorCheckpointer}. This method returns
|
||||
* true if writing to the child process succeeds and the status message received back was for the correct action and
|
||||
* all communications with the child process regarding checkpointing were successful. Note that whether or not the
|
||||
* checkpointing itself was successful is not the concern of this method. This method simply cares whether it was
|
||||
* able to successfully communicate the results of its attempts to checkpoint.
|
||||
*
|
||||
* @param action What action is being waited on.
|
||||
* @param checkpointer A checkpointer.
|
||||
* @param writeFuture The writing task.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
private boolean waitForStatusMessage(String action,
|
||||
IRecordProcessorCheckpointer checkpointer,
|
||||
Future<Boolean> writeFuture) {
|
||||
boolean statusWasCorrect = waitForStatusMessage(action, checkpointer);
|
||||
|
||||
// Examine whether or not we failed somewhere along the line.
|
||||
try {
|
||||
boolean writerIsStillOpen = Boolean.valueOf(writeFuture.get());
|
||||
return statusWasCorrect && writerIsStillOpen;
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error(String.format("Interrupted while writing %s message for shard %s", action, shardId));
|
||||
return false;
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error(String.format("Failed to write %s message for shard %s", action, shardId), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param action What action is being waited on.
|
||||
* @param checkpointer A checkpointer.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
private boolean waitForStatusMessage(String action, IRecordProcessorCheckpointer checkpointer) {
|
||||
StatusMessage statusMessage = null;
|
||||
while (statusMessage == null) {
|
||||
Future<Message> future = this.messageReader.getNextMessageFromSTDOUT();
|
||||
try {
|
||||
Message message = future.get();
|
||||
// Note that instanceof doubles as a check against a value being null
|
||||
if (message instanceof CheckpointMessage) {
|
||||
boolean checkpointWriteSucceeded =
|
||||
Boolean.valueOf(checkpoint((CheckpointMessage) message, checkpointer).get());
|
||||
if (!checkpointWriteSucceeded) {
|
||||
return false;
|
||||
}
|
||||
} else if (message instanceof StatusMessage) {
|
||||
statusMessage = (StatusMessage) message;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error(String.format("Interrupted while waiting for %s message for shard %s", action, shardId));
|
||||
return false;
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error(String.format("Failed to get status message for %s action for shard %s", action, shardId), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return this.validateStatusMessage(statusMessage, action);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility for confirming that the status message is for the provided action.
|
||||
*
|
||||
* @param statusMessage The status of the child process.
|
||||
* @param action The action that was being waited on.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
private boolean validateStatusMessage(StatusMessage statusMessage, String action) {
|
||||
LOG.info("Received response " + statusMessage + " from subprocess while waiting for " + action
|
||||
+ " while processing shard " + shardId);
|
||||
return !(statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor()
|
||||
.equals(action));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to checkpoint with the provided {@link IRecordProcessorCheckpointer} at the sequence number in the
|
||||
* provided {@link CheckpointMessage}. If no sequence number is provided, i.e. the sequence number is null, then
|
||||
* this method will call {@link IRecordProcessorCheckpointer#checkpoint()}. The method returns a future representing
|
||||
* the attempt to write the result of this checkpoint attempt to the child process.
|
||||
*
|
||||
* @param checkpointMessage A checkpoint message.
|
||||
* @param checkpointer A checkpointer.
|
||||
* @return Whether or not this operation succeeded.
|
||||
*/
|
||||
private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer checkpointer) {
|
||||
String sequenceNumber = checkpointMessage.getCheckpoint();
|
||||
try {
|
||||
if (checkpointer != null) {
|
||||
if (sequenceNumber == null) {
|
||||
LOG.info(String.format("Attempting to checkpoint for shard %s", shardId));
|
||||
checkpointer.checkpoint();
|
||||
} else {
|
||||
LOG.info(String.format("Attempting to checkpoint at sequence number %s for shard %s",
|
||||
sequenceNumber, shardId));
|
||||
checkpointer.checkpoint(sequenceNumber);
|
||||
}
|
||||
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, null);
|
||||
} else {
|
||||
String message =
|
||||
String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s",
|
||||
sequenceNumber, shardId);
|
||||
LOG.error(message);
|
||||
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, new InvalidStateException(
|
||||
message));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,276 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* A record processor that manages creating a child process that implements the multi language protocol and connecting
|
||||
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
|
||||
* that object when its corresponding {@link #initialize}, {@link #processRecords}, and {@link #shutdown} methods are
|
||||
* called.
|
||||
*/
|
||||
public class MultiLangRecordProcessor implements IRecordProcessor {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
|
||||
private static final int EXIT_VALUE = 1;
|
||||
|
||||
private String shardId;
|
||||
|
||||
private Future<?> stderrReadTask;
|
||||
|
||||
private MessageWriter messageWriter;
|
||||
private MessageReader messageReader;
|
||||
private DrainChildSTDERRTask readSTDERRTask;
|
||||
|
||||
private ProcessBuilder processBuilder;
|
||||
private Process process;
|
||||
private ExecutorService executorService;
|
||||
private ProcessState state;
|
||||
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
private MultiLangProtocol protocol;
|
||||
|
||||
/**
|
||||
* Used to tell whether the processor has been shutdown already.
|
||||
*/
|
||||
private enum ProcessState {
|
||||
ACTIVE, SHUTDOWN
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param processBuilder Provides process builder functionality.
|
||||
* @param executorService An executor
|
||||
* @param objectMapper An obejct mapper.
|
||||
*/
|
||||
MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService,
|
||||
ObjectMapper objectMapper) {
|
||||
this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(),
|
||||
new DrainChildSTDERRTask());
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: This constructor has package level access solely for testing purposes.
|
||||
*
|
||||
* @param processBuilder Provides the child process for this record processor
|
||||
* @param executorService The executor service which is provided by the {@link MultiLangRecordProcessorFactory}
|
||||
* @param objectMapper Object mapper
|
||||
* @param messageWriter Message write to write to child process's stdin
|
||||
* @param messageReader Message reader to read from child process's stdout
|
||||
* @param readSTDERRTask Error reader to read from child process's stderr
|
||||
*/
|
||||
MultiLangRecordProcessor(ProcessBuilder processBuilder,
|
||||
ExecutorService executorService,
|
||||
ObjectMapper objectMapper,
|
||||
MessageWriter messageWriter,
|
||||
MessageReader messageReader,
|
||||
DrainChildSTDERRTask readSTDERRTask) {
|
||||
this.executorService = executorService;
|
||||
this.processBuilder = processBuilder;
|
||||
this.objectMapper = objectMapper;
|
||||
this.messageWriter = messageWriter;
|
||||
this.messageReader = messageReader;
|
||||
this.readSTDERRTask = readSTDERRTask;
|
||||
|
||||
this.state = ProcessState.ACTIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(String shardIdToProcess) {
|
||||
try {
|
||||
this.shardId = shardIdToProcess;
|
||||
try {
|
||||
this.process = startProcess();
|
||||
} catch (IOException e) {
|
||||
/*
|
||||
* The process builder has thrown an exception while starting the child process so we would like to shut
|
||||
* down
|
||||
*/
|
||||
stopProcessing("Failed to start client executable", e);
|
||||
return;
|
||||
}
|
||||
// Initialize all of our utility objects that will handle interacting with the process over
|
||||
// STDIN/STDOUT/STDERR
|
||||
messageWriter.initialize(process.getOutputStream(), shardId, objectMapper, executorService);
|
||||
messageReader.initialize(process.getInputStream(), shardId, objectMapper, executorService);
|
||||
readSTDERRTask.initialize(process.getErrorStream(), shardId, "Reading STDERR for " + shardId);
|
||||
|
||||
// Submit the error reader for execution
|
||||
stderrReadTask = executorService.submit(readSTDERRTask);
|
||||
|
||||
protocol = new MultiLangProtocol(messageReader, messageWriter, shardId);
|
||||
if (!protocol.initialize()) {
|
||||
throw new RuntimeException("Failed to initialize child process");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
stopProcessing("Encountered an error while trying to initialize record processor", t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
|
||||
try {
|
||||
if (!protocol.processRecords(records, checkpointer)) {
|
||||
throw new RuntimeException("Child process failed to process records");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
stopProcessing("Encountered an error while trying to process records", t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
|
||||
try {
|
||||
if (ProcessState.ACTIVE.equals(this.state)) {
|
||||
if (!protocol.shutdown(checkpointer, reason)) {
|
||||
throw new RuntimeException("Child process failed to shutdown");
|
||||
}
|
||||
|
||||
childProcessShutdownSequence();
|
||||
} else {
|
||||
LOG.warn("Shutdown was called but this processor is already shutdown. Not doing anything.");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (ProcessState.ACTIVE.equals(this.state)) {
|
||||
stopProcessing("Encountered an error while trying to shutdown child process", t);
|
||||
} else {
|
||||
stopProcessing("Encountered an error during shutdown,"
|
||||
+ " but it appears the processor has already been shutdown", t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the necessary shutdown actions for the child process, e.g. stopping all the handlers after they have
|
||||
* drained their streams. Attempts to wait for child process to completely finish before returning.
|
||||
*/
|
||||
private void childProcessShutdownSequence() {
|
||||
try {
|
||||
/*
|
||||
* Close output stream to the child process. The child process should be reading off its stdin until it
|
||||
* receives EOF, closing the output stream should signal this and allow the child process to terminate. We
|
||||
* expect it to terminate immediately, but there is the possibility that the child process then begins to
|
||||
* write to its STDOUT and STDERR.
|
||||
*/
|
||||
if (messageWriter.isOpen()) {
|
||||
messageWriter.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Encountered exception while trying to close output stream.", e);
|
||||
}
|
||||
|
||||
// We should drain the STDOUT and STDERR of the child process. If we don't, the child process might remain
|
||||
// blocked writing to a full pipe buffer.
|
||||
safelyWaitOnFuture(messageReader.drainSTDOUT(), "draining STDOUT");
|
||||
safelyWaitOnFuture(stderrReadTask, "draining STDERR");
|
||||
|
||||
safelyCloseInputStream(process.getErrorStream(), "STDERR");
|
||||
safelyCloseInputStream(process.getInputStream(), "STDOUT");
|
||||
|
||||
/*
|
||||
* By this point the threads handling reading off input streams are done, we do one last thing just to make sure
|
||||
* we don't leave the child process running. The process is expected to have exited by now, but we still make
|
||||
* sure that it exits before we finish.
|
||||
*/
|
||||
try {
|
||||
LOG.info("Child process exited with value: " + process.waitFor());
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Interrupted before process finished exiting. Attempting to kill process.");
|
||||
process.destroy();
|
||||
}
|
||||
|
||||
state = ProcessState.SHUTDOWN;
|
||||
}
|
||||
|
||||
private void safelyCloseInputStream(InputStream inputStream, String name) {
|
||||
try {
|
||||
inputStream.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Encountered exception while trying to close " + name + " stream.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method used by {@link #childProcessShutdownSequence()} to drain the STDIN and STDERR of the child
|
||||
* process.
|
||||
*
|
||||
* @param future A future to wait on.
|
||||
* @param whatThisFutureIsDoing What that future is doing while we wait.
|
||||
*/
|
||||
private void safelyWaitOnFuture(Future<?> future, String whatThisFutureIsDoing) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("Encountered error while " + whatThisFutureIsDoing + " for shard " + shardId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method for logging and safely shutting down so that we don't throw an exception up to the KCL on
|
||||
* accident.
|
||||
*
|
||||
* @param message The reason we are stopping processing.
|
||||
* @param reason An exception that caused us to want to stop processing.
|
||||
*/
|
||||
private void stopProcessing(String message, Throwable reason) {
|
||||
try {
|
||||
LOG.error(message, reason);
|
||||
if (!state.equals(ProcessState.SHUTDOWN)) {
|
||||
childProcessShutdownSequence();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Encountered error while trying to shutdown", t);
|
||||
}
|
||||
exit();
|
||||
}
|
||||
|
||||
/**
|
||||
* We provide a package level method for unit testing this call to exit.
|
||||
*
|
||||
* @param val exit value
|
||||
*/
|
||||
void exit() {
|
||||
System.exit(EXIT_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link ProcessBuilder} class is final so not easily mocked. We wrap the only interaction we have with it in
|
||||
* this package level method to permit unit testing.
|
||||
*
|
||||
* @return The process started by processBuilder
|
||||
* @throws IOException If the process can't be started.
|
||||
*/
|
||||
Process startProcess() throws IOException {
|
||||
return this.processBuilder.start();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Creates {@link MultiLangRecordProcessor}'s.
|
||||
*/
|
||||
public class MultiLangRecordProcessorFactory implements IRecordProcessorFactory {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessorFactory.class);
|
||||
|
||||
private static final String COMMAND_DELIMETER_REGEX = " +";
|
||||
|
||||
private final String command;
|
||||
private final String[] commandArray;
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
/**
|
||||
* @param command The command that will do processing for this factory's record processors.
|
||||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||
*/
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService) {
|
||||
this(command, executorService, new ObjectMapper());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param command The command that will do processing for this factory's record processors.
|
||||
* @param executorService An executor service to use while processing inputs and outputs of the child process.
|
||||
* @param objectMapper An object mapper used to convert messages to json to be written to the child process
|
||||
*/
|
||||
public MultiLangRecordProcessorFactory(String command, ExecutorService executorService, ObjectMapper objectMapper) {
|
||||
this.command = command;
|
||||
this.commandArray = command.split(COMMAND_DELIMETER_REGEX);
|
||||
this.executorService = executorService;
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IRecordProcessor createProcessor() {
|
||||
LOG.debug(String.format("Creating new record processor for client executable: %s", command));
|
||||
/*
|
||||
* Giving ProcessBuilder the command as an array of Strings allows users to specify command line arguments.
|
||||
*/
|
||||
return new MultiLangRecordProcessor(new ProcessBuilder(commandArray), executorService, this.objectMapper);
|
||||
}
|
||||
|
||||
String[] getCommandArray() {
|
||||
return commandArray;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
/**
|
||||
* A checkpoint message is sent by the client's subprocess to indicate to the kcl processor that it should attempt to
|
||||
* checkpoint. The processor sends back a checkpoint message as an acknowledgement that it attempted to checkpoint along
|
||||
* with an error message which corresponds to the names of exceptions that a checkpointer can throw.
|
||||
*/
|
||||
public class CheckpointMessage extends Message {
|
||||
/**
|
||||
* The name used for the action field in {@link Message}.
|
||||
*/
|
||||
public static final String ACTION = "checkpoint";
|
||||
|
||||
/**
|
||||
* The checkpoint this message is about.
|
||||
*/
|
||||
private String checkpoint;
|
||||
|
||||
/**
|
||||
* The name of an exception that occurred while attempting to checkpoint.
|
||||
*/
|
||||
private String error;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public CheckpointMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param sequenceNumber The sequence number that this message is about.
|
||||
* @param throwable When responding to a client's process, the record processor will add the name of the exception
|
||||
* that occurred while attempting to checkpoint if one did occur.
|
||||
*/
|
||||
public CheckpointMessage(String sequenceNumber, Throwable throwable) {
|
||||
this.setCheckpoint(sequenceNumber);
|
||||
if (throwable != null) {
|
||||
this.setError(throwable.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The checkpoint.
|
||||
*/
|
||||
public String getCheckpoint() {
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The error.
|
||||
*/
|
||||
public String getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param checkpoint The checkpoint.
|
||||
*/
|
||||
public void setCheckpoint(String checkpoint) {
|
||||
this.checkpoint = checkpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param error The error.
|
||||
*/
|
||||
public void setError(String error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
/**
|
||||
* An initialize message is sent to the client's subprocess to indicate that it should perform its initialization steps.
|
||||
*/
|
||||
public class InitializeMessage extends Message {
|
||||
/**
|
||||
* The name used for the action field in {@link Message}.
|
||||
*/
|
||||
public static final String ACTION = "initialize";
|
||||
|
||||
/**
|
||||
* The shard id that this processor is getting initialized for.
|
||||
*/
|
||||
private String shardId;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public InitializeMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param shardId The shard id.
|
||||
*/
|
||||
public InitializeMessage(String shardId) {
|
||||
this.setShardId(shardId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The shard id.
|
||||
*/
|
||||
public String getShardId() {
|
||||
return shardId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardId The shard id.
|
||||
*/
|
||||
public void setShardId(String shardId) {
|
||||
this.shardId = shardId;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
/**
|
||||
* Class for encoding Record objects to json. Needed because Records have byte buffers for their data field which causes
|
||||
* problems for the json library we're using.
|
||||
*/
|
||||
public class JsonFriendlyRecord {
|
||||
private byte[] data;
|
||||
private String partitionKey;
|
||||
private String sequenceNumber;
|
||||
|
||||
/**
|
||||
* Default Constructor.
|
||||
*/
|
||||
public JsonFriendlyRecord() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param record The record that this message will represent.
|
||||
*/
|
||||
public JsonFriendlyRecord(Record record) {
|
||||
this.withData(record.getData() == null ? null : record.getData().array())
|
||||
.withPartitionKey(record.getPartitionKey()).withSequenceNumber(record.getSequenceNumber());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The data.
|
||||
*/
|
||||
public byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The partition key.
|
||||
*/
|
||||
public String getPartitionKey() {
|
||||
return partitionKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The sequence number.
|
||||
*/
|
||||
public String getSequenceNumber() {
|
||||
return sequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data The data.
|
||||
*/
|
||||
public void setData(byte[] data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param partitionKey The partition key.
|
||||
*/
|
||||
public void setPartitionKey(String partitionKey) {
|
||||
this.partitionKey = partitionKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sequenceNumber The sequence number.
|
||||
*/
|
||||
public void setSequenceNumber(String sequenceNumber) {
|
||||
this.sequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param data The data.
|
||||
*
|
||||
* @return this
|
||||
*/
|
||||
public JsonFriendlyRecord withData(byte[] data) {
|
||||
this.setData(data);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param partitionKey The partition key.
|
||||
*
|
||||
* @return this
|
||||
*/
|
||||
public JsonFriendlyRecord withPartitionKey(String partitionKey) {
|
||||
this.setPartitionKey(partitionKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sequenceNumber The sequence number.
|
||||
*
|
||||
* @return this
|
||||
*/
|
||||
public JsonFriendlyRecord withSequenceNumber(String sequenceNumber) {
|
||||
this.setSequenceNumber(sequenceNumber);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Abstract class for all messages that are sent to the client's process.
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "action")
|
||||
@JsonSubTypes({ @Type(value = CheckpointMessage.class, name = CheckpointMessage.ACTION),
|
||||
@Type(value = InitializeMessage.class, name = InitializeMessage.ACTION),
|
||||
@Type(value = ProcessRecordsMessage.class, name = ProcessRecordsMessage.ACTION),
|
||||
@Type(value = ShutdownMessage.class, name = ShutdownMessage.ACTION),
|
||||
@Type(value = StatusMessage.class, name = StatusMessage.ACTION), })
|
||||
public abstract class Message {
|
||||
|
||||
private ObjectMapper mapper = new ObjectMapper();;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public Message() {
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param objectMapper An object mapper.
|
||||
* @return this
|
||||
*/
|
||||
Message withObjectMapper(ObjectMapper objectMapper) {
|
||||
this.mapper = objectMapper;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return A JSON representation of this object.
|
||||
*/
|
||||
public String toString() {
|
||||
try {
|
||||
return mapper.writeValueAsString(this);
|
||||
} catch (Exception e) {
|
||||
return super.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
|
||||
/**
|
||||
* A message to indicate to the client's process that it should process a list of records.
|
||||
*/
|
||||
public class ProcessRecordsMessage extends Message {
|
||||
/**
|
||||
* The name used for the action field in {@link Message}.
|
||||
*/
|
||||
public static final String ACTION = "processRecords";
|
||||
|
||||
/**
|
||||
* The records that the client's process needs to handle.
|
||||
*/
|
||||
private List<JsonFriendlyRecord> records;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public ProcessRecordsMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param records The records.
|
||||
*/
|
||||
public ProcessRecordsMessage(List<Record> records) {
|
||||
List<JsonFriendlyRecord> recordMessages = new ArrayList<JsonFriendlyRecord>();
|
||||
for (Record record : records) {
|
||||
recordMessages.add(new JsonFriendlyRecord(record));
|
||||
}
|
||||
this.setRecords(recordMessages);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The records.
|
||||
*/
|
||||
public List<JsonFriendlyRecord> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param records The records.
|
||||
*/
|
||||
public void setRecords(List<JsonFriendlyRecord> records) {
|
||||
this.records = records;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
|
||||
|
||||
/**
|
||||
* A message to indicate to the client's process that it should shutdown and then terminate.
|
||||
*/
|
||||
public class ShutdownMessage extends Message {
|
||||
/**
|
||||
* The name used for the action field in {@link Message}.
|
||||
*/
|
||||
public static final String ACTION = "shutdown";
|
||||
|
||||
/**
|
||||
* The reason for shutdown, e.g. TERMINATE or ZOMBIE
|
||||
*/
|
||||
private String reason;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public ShutdownMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param reason The reason.
|
||||
*/
|
||||
public ShutdownMessage(ShutdownReason reason) {
|
||||
if (reason == null) {
|
||||
this.setReason(null);
|
||||
} else {
|
||||
this.setReason(String.valueOf(reason));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return reason The reason.
|
||||
*/
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason The reason.
|
||||
*/
|
||||
public void setReason(String reason) {
|
||||
this.reason = reason;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang.messages;
|
||||
|
||||
/**
|
||||
* A message sent by the client's process to indicate to the record processor that it completed a particular action.
|
||||
*/
|
||||
public class StatusMessage extends Message {
|
||||
/**
|
||||
* The name used for the action field in {@link Message}.
|
||||
*/
|
||||
public static final String ACTION = "status";
|
||||
|
||||
/**
|
||||
* The name of the most recently received action.
|
||||
*/
|
||||
private String responseFor;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public StatusMessage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor.
|
||||
*
|
||||
* @param responseFor The response for.
|
||||
*/
|
||||
public StatusMessage(String responseFor) {
|
||||
this.setResponseFor(responseFor);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return The response for.
|
||||
*/
|
||||
public String getResponseFor() {
|
||||
return responseFor;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param responseFor The response for.
|
||||
*/
|
||||
public void setResponseFor(String responseFor) {
|
||||
this.responseFor = responseFor;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* This package provides a KCL application which implements the multi language protocol. The multi language protocol
|
||||
* defines a system for communication between a KCL multi-lang application and another process (referred to as the
|
||||
* "child process") over STDIN and STDOUT of the child process. The units of communication are JSON messages which
|
||||
* represent the actions the receiving entity should perform. The child process is responsible for reacting
|
||||
* appropriately to four different messages: initialize, processRecords, checkpoint, and shutdown. The KCL multi-lang
|
||||
* app is responsible for reacting appropriately to two messages generated by the child process: status and checkpoint.
|
||||
*
|
||||
* <h3>Action messages sent to child process</h3>
|
||||
*
|
||||
* <pre>
|
||||
* { "action" : "initialize",
|
||||
* "shardId" : "string",
|
||||
* }
|
||||
*
|
||||
* { "action" : "processRecords",
|
||||
* "records" : [{ "data" : "<base64encoded_string>",
|
||||
* "partitionKey" : "<partition key>",
|
||||
* "sequenceNumber" : "<sequence number>";
|
||||
* }] // a list of records
|
||||
* }
|
||||
*
|
||||
* { "action" : "checkpoint",
|
||||
* "checkpoint" : "<sequence number>",
|
||||
* "error" : "<NameOfException>"
|
||||
* }
|
||||
*
|
||||
* { "action" : "shutdown",
|
||||
* "reason" : "<TERMINATE|ZOMBIE>"
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h3>Action messages sent to KCL by the child process</h3>
|
||||
*
|
||||
* <pre>
|
||||
* { "action" : "checkpoint",
|
||||
* "checkpoint" : "<sequenceNumberToCheckpoint>";
|
||||
* }
|
||||
*
|
||||
* { "action" : "status",
|
||||
* "responseFor" : "<nameOfAction>";
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h3>High Level Description Of Protocol</h3>
|
||||
*
|
||||
* The child process will be started by the KCL multi-lang application. There will be one child process for each shard
|
||||
* that this worker is assigned to. The multi-lang app will send an initialize, processRecords, or shutdown message upon
|
||||
* invocation of its corresponding methods. Each message will be on a single line, the messages will be
|
||||
* separated by new lines.The child process is expected to read these messages off its STDIN line by line. The child
|
||||
* process must respond over its STDOUT with a status message indicating that is has finished performing the most recent
|
||||
* action. The multi-lang daemon will not begin to send another message until it has received the response for the
|
||||
* previous message.
|
||||
*
|
||||
* <h4>Checkpointing Behavior</h4>
|
||||
*
|
||||
* The child process may send a checkpoint message at any time <b>after</b> receiving a processRecords or shutdown
|
||||
* action and <b>before</b> sending the corresponding status message back to the processor. After sending a checkpoint
|
||||
* message over STDOUT, the child process is expected to immediately begin to read its STDIN, waiting for the checkpoint
|
||||
* result message from the KCL multi-lang processor.
|
||||
*
|
||||
* <h3>Protocol From Child Process Perspective</h3>
|
||||
*
|
||||
* <h4>Initialize</h4>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Read an "initialize" action from STDIN</li>
|
||||
* <li>Perform initialization steps</li>
|
||||
* <li>Write "status" message to indicate you are done</li>
|
||||
* <li>Begin reading line from STDIN to receive next action</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h4>ProcessRecords</h4>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Read a "processRecords" action from STDIN</li>
|
||||
* <li>Perform processing tasks (you may write a checkpoint message at any time)</li>
|
||||
* <li>Write "status" message to STDOUT to indicate you are done.</li>
|
||||
* <li>Begin reading line from STDIN to receive next action</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h4>Shutdown</h4>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Read a "shutdown" action from STDIN</li>
|
||||
* <li>Perform shutdown tasks (you may write a checkpoint message at any time)</li>
|
||||
* <li>Write "status" message to STDOUT to indicate you are done.</li>
|
||||
* <li>Begin reading line from STDIN to receive next action</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h4>Checkpoint</h4>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Read a "checkpoint" action from STDIN</li>
|
||||
* <li>Decide whether to checkpoint again based on whether there is an error or not.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h3>Base 64 Encoding</h3>
|
||||
*
|
||||
* The "data" field of the processRecords action message is an array of arbitrary bytes. To send this in a JSON string
|
||||
* we apply base 64 encoding which transforms the byte array into a string (specifically this string doesn't have JSON
|
||||
* special symbols or new lines in it). The multi-lang processor will use the Jackson library which uses a variant of
|
||||
* MIME called MIME_NO_LINEFEEDS <a href=
|
||||
* "http://fasterxml.github.io/jackson-core/javadoc/2.3.0/com/fasterxml/jackson/core/class-use/Base64Variant.html">(see
|
||||
* Jackson doc for more details)</a> MIME is the basis of most base64 encoding variants including <a
|
||||
* href="http://tools.ietf.org/html/rfc3548.html">RFC 3548</a> which is the standard used by Python's <a
|
||||
* href="https://docs.python.org/2/library/base64.html">base64</a> module.
|
||||
*
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.multilang;
|
||||
|
||||
Loading…
Reference in a new issue