diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java index 1f508771..9976b071 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.config; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.lang.reflect.Constructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +31,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class); private static final String AUTH_PREFIX = "com.amazonaws.auth."; private static final String LIST_DELIMITER = ","; + private static final String ARG_DELIMITER = "|"; /** * Constructor. @@ -39,7 +41,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode /** * Get AWSCredentialsProvider property. - * + * * @param value property value as String * @return corresponding variable in correct type */ @@ -70,11 +72,25 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getValidCredentialsProviders(List providerNames) { List credentialsProviders = new ArrayList(); for (String providerName : providerNames) { - try { - Class className = Class.forName(providerName); - credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); - } catch (Exception e) { - LOG.debug("Can't find any credentials provider matching " + providerName + "."); + if (providerName.contains(ARG_DELIMITER)) { + String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER); + Class[] argTypes = new Class[nameAndArgs.length - 1]; + Arrays.fill(argTypes, String.class); + try { + Class className = Class.forName(nameAndArgs[0]); + Constructor c = className.getConstructor(argTypes); + credentialsProviders.add((AWSCredentialsProvider) c.newInstance( + Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length))); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } + } else { + try { + Class className = Class.forName(providerName); + credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } } } return credentialsProviders; @@ -97,13 +113,13 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getPossibleFullClassNames(String s) { /* * We take care of three cases : - * + * * 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes * implementing the AWSCredentialsProvider interface: * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html - * + * * 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider - * + * * 3. Customer provides a custom credentials provider with full name of provider */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 37e609b5..174fb65e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -170,6 +170,7 @@ public class KinesisClientLibConfiguration { private String tableName; private String streamName; private String kinesisEndpoint; + private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -204,7 +205,7 @@ public class KinesisClientLibConfiguration { /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application. * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -221,7 +222,7 @@ public class KinesisClientLibConfiguration { /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -237,7 +238,7 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, + this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, @@ -305,6 +306,76 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName) { + this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, + maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, + shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, + taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, + validateSequenceNumberBeforeCheckpointing, regionName); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -319,6 +390,7 @@ public class KinesisClientLibConfiguration { this.tableName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; this.initialPositionInStream = initialPositionInStream; this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; @@ -354,7 +426,7 @@ public class KinesisClientLibConfiguration { // Check if value is positive, otherwise throw an exception private void checkIsValuePositive(String key, long value) { if (value <= 0) { - throw new IllegalArgumentException("Value of " + key + throw new IllegalArgumentException("Value of " + key + " should be positive, but current value is " + value); } } @@ -373,11 +445,11 @@ public class KinesisClientLibConfiguration { config.setUserAgent(existingUserAgent); return config; } - + private void checkIsRegionNameValid(String regionNameToCheck) { if (regionNameToCheck != null && RegionUtils.getRegion(regionNameToCheck) == null) { throw new IllegalArgumentException("The specified region name is not valid"); - } + } } /** @@ -478,6 +550,13 @@ public class KinesisClientLibConfiguration { return kinesisEndpoint; } + /** + * @return DynamoDB endpoint + */ + public String getDynamoDBEndpoint() { + return dynamoDBEndpoint; + } + /** * @return the initialPositionInStream */ @@ -648,6 +727,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param dynamoDBEndpoint DynamoDB endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) { + this.dynamoDBEndpoint = dynamoDBEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * will start fetching records from this position when the application starts up if there are no checkpoints. @@ -784,7 +872,7 @@ public class KinesisClientLibConfiguration { /** * Override the default user agent (application name). - * + * * @param userAgent User agent to use in AWS requests * @return KinesisClientLibConfiguration */ @@ -840,7 +928,7 @@ public class KinesisClientLibConfiguration { * NONE * SUMMARY * DETAILED - * + * * @param metricsLevel Metrics level to enable. * @return KinesisClientLibConfiguration */ @@ -867,7 +955,7 @@ public class KinesisClientLibConfiguration { } /** - * + * * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)}. @@ -883,7 +971,7 @@ public class KinesisClientLibConfiguration { * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases * in the lease table. This assumes that the shards and leases are in-sync. * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). - * + * * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker * initialization). * @return KinesisClientLibConfiguration @@ -895,7 +983,7 @@ public class KinesisClientLibConfiguration { } /** - * + * * @param regionName The region name for the service * @return KinesisClientLibConfiguration */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 09c31d87..b644a790 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -248,6 +248,11 @@ public class Worker implements Runnable { dynamoDBClient.setRegion(region); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } + // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. + if (config.getDynamoDBEndpoint() != null) { + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { kinesisClient.setEndpoint(config.getKinesisEndpoint()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java new file mode 100644 index 00000000..cddd837a --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.config; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoder; + +public class AWSCredentialsProviderPropertyValueDecoderTest { + + private static final String TEST_ACCESS_KEY_ID = "123"; + private static final String TEST_SECRET_KEY = "456"; + + private String credentialName1 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider"; + private String credentialName2 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider"; + private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); + + @Test + public void testSingleProvider() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testTwoProviders() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1 + "," + credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testProfileProviderWithOneArg() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + "|arg"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "blank"); + } + + @Test + public void testProfileProviderWithTwoArgs() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + + "|arg1|arg2"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg1"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "arg2"); + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProvider implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(TEST_ACCESS_KEY_ID, TEST_SECRET_KEY); + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider needs a constructor call to instantiate it + */ + public static class ConstructorCredentialsProvider implements AWSCredentialsProvider { + + private String arg1; + private String arg2; + + public ConstructorCredentialsProvider(String arg1) { + this.arg1 = arg1; + this.arg2 = "blank"; + } + + public ConstructorCredentialsProvider(String arg1, String arg2) { + this.arg1 = arg1; + this.arg2 = arg2; + } + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(arg1, arg2); + } + + @Override + public void refresh() { + } + + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index e71948c9..4874a164 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -62,6 +62,7 @@ public class KinesisClientLibConfigurationTest { // Test constructor with all valid arguments. config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -99,6 +100,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -132,6 +134,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -209,7 +212,7 @@ public class KinesisClientLibConfigurationTest { AmazonDynamoDBClient dclient = Mockito.mock(AmazonDynamoDBClient.class); AmazonCloudWatchClient cclient = Mockito.mock(AmazonCloudWatchClient.class); Region region = RegionUtils.getRegion("us-west-2"); - + AWSCredentialsProvider credentialsProvider = Mockito.mock(AWSCredentialsProvider.class); KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") @@ -262,7 +265,7 @@ public class KinesisClientLibConfigurationTest { Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); - + kclConfig = Mockito.spy( new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") .withKinesisEndpoint("https://kinesis.eu-west-1.amazonaws.com")); @@ -294,6 +297,7 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, null,