Allow for credential providers with basic arg types (#99)

* Add support for configuring DynamoDB endpoint

Adding a new field named `dynamoDBEndpoint` to the .properties file
that gets passed into the KCL multi-lang daemon. We need this ability
to point the KCL worker at a local instance of DynamoDB rather than in
AWS.

* Added the ability to use AWSCredentialsProvider's that require non-empty contructor args e.g. ProfileCredentialsProvider where you provide the profile name to use from your ~/.aws/credentials file

* Created a constructor without the dynamoDBEndpoint argument i.e. same arguments before the dynamo change, for backwards compatibility
This commit is contained in:
Josh 2016-10-14 11:05:44 -06:00 committed by Justin Pfifer
parent 56c59f685a
commit db314b970b
5 changed files with 250 additions and 22 deletions

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +31,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class);
private static final String AUTH_PREFIX = "com.amazonaws.auth.";
private static final String LIST_DELIMITER = ",";
private static final String ARG_DELIMITER = "|";
/**
* Constructor.
@ -70,6 +72,19 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) {
List<AWSCredentialsProvider> credentialsProviders = new ArrayList<AWSCredentialsProvider>();
for (String providerName : providerNames) {
if (providerName.contains(ARG_DELIMITER)) {
String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER);
Class<?>[] argTypes = new Class<?>[nameAndArgs.length - 1];
Arrays.fill(argTypes, String.class);
try {
Class<?> className = Class.forName(nameAndArgs[0]);
Constructor<?> c = className.getConstructor(argTypes);
credentialsProviders.add((AWSCredentialsProvider) c.newInstance(
Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length)));
} catch (Exception e) {
LOG.debug("Can't find any credentials provider matching " + providerName + ".");
}
} else {
try {
Class<?> className = Class.forName(providerName);
credentialsProviders.add((AWSCredentialsProvider) className.newInstance());
@ -77,6 +92,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
LOG.debug("Can't find any credentials provider matching " + providerName + ".");
}
}
}
return credentialsProviders;
}

View file

@ -170,6 +170,7 @@ public class KinesisClientLibConfiguration {
private String tableName;
private String streamName;
private String kinesisEndpoint;
private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -237,7 +238,7 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
@ -305,6 +306,76 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -319,6 +390,7 @@ public class KinesisClientLibConfiguration {
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
@ -478,6 +550,13 @@ public class KinesisClientLibConfiguration {
return kinesisEndpoint;
}
/**
* @return DynamoDB endpoint
*/
public String getDynamoDBEndpoint() {
return dynamoDBEndpoint;
}
/**
* @return the initialPositionInStream
*/
@ -648,6 +727,15 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) {
this.dynamoDBEndpoint = dynamoDBEndpoint;
return this;
}
/**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints.

View file

@ -248,6 +248,11 @@ public class Worker implements Runnable {
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
}
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint());

View file

@ -0,0 +1,115 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.config;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoder;
public class AWSCredentialsProviderPropertyValueDecoderTest {
private static final String TEST_ACCESS_KEY_ID = "123";
private static final String TEST_SECRET_KEY = "456";
private String credentialName1 =
"com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider";
private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
@Test
public void testSingleProvider() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName1);
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID);
assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY);
}
@Test
public void testTwoProviders() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName1 + "," + credentialName1);
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID);
assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY);
}
@Test
public void testProfileProviderWithOneArg() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + "|arg");
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg");
assertEquals(provider.getCredentials().getAWSSecretKey(), "blank");
}
@Test
public void testProfileProviderWithTwoArgs() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 +
"|arg1|arg2");
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg1");
assertEquals(provider.getCredentials().getAWSSecretKey(), "arg2");
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProvider implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(TEST_ACCESS_KEY_ID, TEST_SECRET_KEY);
}
@Override
public void refresh() {
}
}
/**
* This credentials provider needs a constructor call to instantiate it
*/
public static class ConstructorCredentialsProvider implements AWSCredentialsProvider {
private String arg1;
private String arg2;
public ConstructorCredentialsProvider(String arg1) {
this.arg1 = arg1;
this.arg2 = "blank";
}
public ConstructorCredentialsProvider(String arg1, String arg2) {
this.arg1 = arg1;
this.arg2 = arg2;
}
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(arg1, arg2);
}
@Override
public void refresh() {
}
}
}

View file

@ -62,6 +62,7 @@ public class KinesisClientLibConfigurationTest {
// Test constructor with all valid arguments.
config =
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
@ -99,6 +100,7 @@ public class KinesisClientLibConfigurationTest {
try {
config =
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
@ -132,6 +134,7 @@ public class KinesisClientLibConfigurationTest {
try {
config =
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
TEST_STRING,
InitialPositionInStream.LATEST,
@ -294,6 +297,7 @@ public class KinesisClientLibConfigurationTest {
Mockito.mock(AWSCredentialsProvider.class);
try {
new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING,
TEST_STRING,
null,