Feature/multiple cred providers (#111)

* Add ability to specify different credential providers for Kinesis, DynamoDB, and CloudWatch.  This is needed when accessing a cross-account Kineses stream using an assumed role.

* Fix copy/paste mistake.

* Update tests.

Thanks to rgfindl@
This commit is contained in:
Randy Findley 2016-10-14 11:10:38 -04:00 committed by Justin Pfifer
parent 51663f96c7
commit 5c497d87a9
2 changed files with 176 additions and 4 deletions

View file

@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator {
// Required properties
private static final String PROP_APP_NAME = "applicationName";
private static final String PROP_STREAM_NAME = "streamName";
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
private static final String PROP_WORKER_ID = "workerId";
private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator {
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
AWSCredentialsProvider provider =
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER));
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));
if (applicationName == null || applicationName.isEmpty()) {
throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator {
throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
}
// Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerDynamoDB;
String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
if (propCredentialsProviderDynamoDBValue == null) {
providerDynamoDB = provider;
} else {
providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
}
// Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerCloudWatch;
String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
if (propCredentialsProviderCloudWatchValue == null) {
providerCloudWatch = provider;
} else {
providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
}
// Allow customer not to provide workerId or to provide empty worker id.
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
if (workerId == null || workerId.isEmpty()) {
@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator {
}
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId);
new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);
Set<String> requiredNames =
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
PROP_APP_NAME,
PROP_WORKER_ID,
PROP_CREDENTIALS_PROVIDER));
PROP_CREDENTIALS_PROVIDER_KINESIS));
// Set all the variables that are not used for constructor.
for (Object keyObject : properties.keySet()) {

View file

@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest {
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
private String credentialNameKinesis =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
private String credentialNameDynamoDB =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
private String credentialNameCloudWatch =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
@Test
@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest {
}
}
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("DynamoDB credential providers should not fail.");
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("CloudWatch credential providers should not fail.");
}
}
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialName1,
"AWSCredentialsProviderCloudWatch = " + credentialName1,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
fail("DynamoDB credential providers should fail.");
} catch (Exception e) {
// succeed
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
fail("CloudWatch credential providers should fail.");
} catch (Exception e) {
// succeed
}
}
/**
* This credentials provider will always succeed
*/
@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest {
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always fail
*/