diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java index 027dde22..e239f967 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java @@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator { // Required properties private static final String PROP_APP_NAME = "applicationName"; private static final String PROP_STREAM_NAME = "streamName"; - private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB"; + private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch"; private static final String PROP_WORKER_ID = "workerId"; private Map, IPropertyValueDecoder> classToDecoder; @@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator { String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME)); String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME)); AWSCredentialsProvider provider = - awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER)); + awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS)); if (applicationName == null || applicationName.isEmpty()) { throw new IllegalArgumentException("Value of applicationName should be explicitly provided."); @@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator { throw new IllegalArgumentException("Value of streamName should be explicitly provided."); } + // Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerDynamoDB; + String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB); + if (propCredentialsProviderDynamoDBValue == null) { + providerDynamoDB = provider; + } else { + providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue); + } + + // Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerCloudWatch; + String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH); + if (propCredentialsProviderCloudWatchValue == null) { + providerCloudWatch = provider; + } else { + providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue); + } + // Allow customer not to provide workerId or to provide empty worker id. String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID)); if (workerId == null || workerId.isEmpty()) { @@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator { } KinesisClientLibConfiguration config = - new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId); + new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId); Set requiredNames = new HashSet(Arrays.asList(PROP_STREAM_NAME, PROP_APP_NAME, PROP_WORKER_ID, - PROP_CREDENTIALS_PROVIDER)); + PROP_CREDENTIALS_PROVIDER_KINESIS)); // Set all the variables that are not used for constructor. for (Object keyObject : properties.keySet()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index 1ccd4941..cbdd0a2d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest { "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; private String credentialName2 = "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; + private String credentialNameKinesis = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis"; + private String credentialNameDynamoDB = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB"; + private String credentialNameCloudWatch = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch"; private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); @Test @@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest { } } + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB, + "AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("DynamoDB credential providers should not fail."); + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("CloudWatch credential providers should not fail."); + } + } + + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialName1, + "AWSCredentialsProviderCloudWatch = " + credentialName1, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + fail("DynamoDB credential providers should fail."); + } catch (Exception e) { + // succeed + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + fail("CloudWatch credential providers should fail."); + } catch (Exception e) { + // succeed + } + } + /** * This credentials provider will always succeed */ @@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest { } + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + /** * This credentials provider will always fail */