Add ability to specify different credential providers for Kinesis, DynamoDB, and CloudWatch. This is needed when accessing a cross-account Kineses stream using an assumed role.
This commit is contained in:
parent
51663f96c7
commit
982c80704a
2 changed files with 47 additions and 4 deletions
|
|
@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator {
|
|||
// Required properties
|
||||
private static final String PROP_APP_NAME = "applicationName";
|
||||
private static final String PROP_STREAM_NAME = "streamName";
|
||||
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider";
|
||||
private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
|
||||
private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
|
||||
private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
|
||||
private static final String PROP_WORKER_ID = "workerId";
|
||||
|
||||
private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
|
||||
|
|
@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator {
|
|||
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
|
||||
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
|
||||
AWSCredentialsProvider provider =
|
||||
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER));
|
||||
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));
|
||||
|
||||
if (applicationName == null || applicationName.isEmpty()) {
|
||||
throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
|
||||
|
|
@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator {
|
|||
throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
|
||||
}
|
||||
|
||||
// Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
|
||||
AWSCredentialsProvider providerDynamoDB;
|
||||
String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
|
||||
if (propCredentialsProviderDynamoDBValue == null) {
|
||||
providerDynamoDB = provider;
|
||||
} else {
|
||||
providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
|
||||
}
|
||||
|
||||
// Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
|
||||
AWSCredentialsProvider providerCloudWatch;
|
||||
String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
|
||||
if (propCredentialsProviderDynamoDBValue == null) {
|
||||
providerCloudWatch = provider;
|
||||
} else {
|
||||
providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
|
||||
}
|
||||
|
||||
// Allow customer not to provide workerId or to provide empty worker id.
|
||||
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
|
||||
if (workerId == null || workerId.isEmpty()) {
|
||||
|
|
@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator {
|
|||
}
|
||||
|
||||
KinesisClientLibConfiguration config =
|
||||
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId);
|
||||
new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);
|
||||
|
||||
Set<String> requiredNames =
|
||||
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
|
||||
PROP_APP_NAME,
|
||||
PROP_WORKER_ID,
|
||||
PROP_CREDENTIALS_PROVIDER));
|
||||
PROP_CREDENTIALS_PROVIDER_KINESIS));
|
||||
|
||||
// Set all the variables that are not used for constructor.
|
||||
for (Object keyObject : properties.keySet()) {
|
||||
|
|
|
|||
|
|
@ -329,6 +329,29 @@ public class KinesisClientLibConfiguratorTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() {
|
||||
String test = StringUtils.join(new String[] {
|
||||
"streamName = a",
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = " + credentialName1,
|
||||
"AWSCredentialsProviderDynamoDB = " + credentialName1,
|
||||
"AWSCredentialsProviderCloudWatch = " + credentialName1,
|
||||
"failoverTimeMillis = 100",
|
||||
"shardSyncIntervalMillis = 500"
|
||||
}, '\n');
|
||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||
|
||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||
try {
|
||||
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
|
||||
config.getKinesisCredentialsProvider().getCredentials();
|
||||
fail("expect failure with wrong credentials provider");
|
||||
} catch (Exception e) {
|
||||
// succeed
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This credentials provider will always succeed
|
||||
*/
|
||||
|
|
|
|||
Loading…
Reference in a new issue