* [#367] Enhanced multi-lang `AWSCredentialsProvider=...` decoder and construction. + added support for external ids (issue #367) + added support for endpoint+region (e.g., STS via VPC) * Multiple multi-lang edits to introduce logging and additional tests. + added `ENDPOINT_REGION` nested key for a simpler Cx experience + deduplicated, and improved, logic w.r.t. CredentialsProvider construction to NOT swallow Exceptions * Relocated `multilang.properties` from `main/resources` to `test/resources`
This commit is contained in:
parent
46cd1179d4
commit
2f4ff65681
13 changed files with 734 additions and 66 deletions
|
|
@ -45,11 +45,11 @@ public class MultiLangDaemonConfig {
|
||||||
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
|
private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage";
|
||||||
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
|
private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads";
|
||||||
|
|
||||||
private MultiLangDaemonConfiguration multiLangDaemonConfiguration;
|
private final MultiLangDaemonConfiguration multiLangDaemonConfiguration;
|
||||||
|
|
||||||
private ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
private MultiLangRecordProcessorFactory recordProcessorFactory;
|
private final MultiLangRecordProcessorFactory recordProcessorFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
|
@ -165,7 +165,6 @@ public class MultiLangDaemonConfig {
|
||||||
propertyStream.close();
|
propertyStream.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean validateProperties(Properties properties) {
|
private static boolean validateProperties(Properties properties) {
|
||||||
|
|
@ -182,12 +181,12 @@ public class MultiLangDaemonConfig {
|
||||||
log.debug("Value for {} property is {}", PROP_MAX_ACTIVE_THREADS, maxActiveThreads);
|
log.debug("Value for {} property is {}", PROP_MAX_ACTIVE_THREADS, maxActiveThreads);
|
||||||
if (maxActiveThreads <= 0) {
|
if (maxActiveThreads <= 0) {
|
||||||
log.info("Using a cached thread pool.");
|
log.info("Using a cached thread pool.");
|
||||||
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
|
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
|
||||||
builder.build());
|
builder.build());
|
||||||
} else {
|
} else {
|
||||||
log.info("Using a fixed thread pool with {} max active threads.", maxActiveThreads);
|
log.info("Using a fixed thread pool with {} max active threads.", maxActiveThreads);
|
||||||
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
|
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<Runnable>(), builder.build());
|
new LinkedBlockingQueue<>(), builder.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,6 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
|
||||||
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
||||||
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A record processor that manages creating a child process that implements the multi language protocol and connecting
|
* A record processor that manages creating a child process that implements the multi language protocol and connecting
|
||||||
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
|
* that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on
|
||||||
|
|
@ -50,20 +49,20 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
|
||||||
|
|
||||||
private Future<?> stderrReadTask;
|
private Future<?> stderrReadTask;
|
||||||
|
|
||||||
private MessageWriter messageWriter;
|
private final MessageWriter messageWriter;
|
||||||
private MessageReader messageReader;
|
private final MessageReader messageReader;
|
||||||
private DrainChildSTDERRTask readSTDERRTask;
|
private final DrainChildSTDERRTask readSTDERRTask;
|
||||||
|
|
||||||
private ProcessBuilder processBuilder;
|
private final ProcessBuilder processBuilder;
|
||||||
private Process process;
|
private Process process;
|
||||||
private ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
private ProcessState state;
|
private ProcessState state;
|
||||||
|
|
||||||
private ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
private MultiLangProtocol protocol;
|
private MultiLangProtocol protocol;
|
||||||
|
|
||||||
private MultiLangDaemonConfiguration configuration;
|
private final MultiLangDaemonConfiguration configuration;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(InitializationInput initializationInput) {
|
public void initialize(InitializationInput initializationInput) {
|
||||||
|
|
@ -213,7 +212,6 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
|
||||||
this.readSTDERRTask = readSTDERRTask;
|
this.readSTDERRTask = readSTDERRTask;
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
|
|
||||||
|
|
||||||
this.state = ProcessState.ACTIVE;
|
this.state = ProcessState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -303,8 +301,6 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We provide a package level method for unit testing this call to exit.
|
* We provide a package level method for unit testing this call to exit.
|
||||||
*
|
|
||||||
* @param val exit value
|
|
||||||
*/
|
*/
|
||||||
void exit() {
|
void exit() {
|
||||||
System.exit(EXIT_VALUE);
|
System.exit(EXIT_VALUE);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,145 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.multilang;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
import com.google.common.base.CaseFormat;
|
||||||
|
|
||||||
|
import lombok.AccessLevel;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Key-Value pairs which may be nested in, and extracted from, a property value
|
||||||
|
* in a Java properties file. For example, given the line in a property file of
|
||||||
|
* {@code my_key = my_value|foo=bar} and a delimiter split on {@code |} (pipe),
|
||||||
|
* the value {@code my_value|foo=bar} would have a nested key of {@code foo}
|
||||||
|
* and its corresponding value is {@code bar}.
|
||||||
|
* <br/><br/>
|
||||||
|
* The order of nested properties does not matter, and these properties are optional.
|
||||||
|
* Customers may choose to provide, in any order, zero-or-more nested properties.
|
||||||
|
* <br/><br/>
|
||||||
|
* Duplicate keys are not supported, and may result in a last-write-wins outcome.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public enum NestedPropertyKey {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify the service endpoint where requests will be submitted.
|
||||||
|
* This property's value must be in the following format:
|
||||||
|
* <pre>
|
||||||
|
* ENDPOINT ::= SERVICE_ENDPOINT "^" SIGNING_REGION
|
||||||
|
* SERVICE_ENDPOINT ::= URL
|
||||||
|
* SIGNING_REGION ::= AWS_REGION
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* It would be redundant to provide both this and {@link #ENDPOINT_REGION}.
|
||||||
|
*
|
||||||
|
* @see #ENDPOINT_REGION
|
||||||
|
* @see <a href="https://docs.aws.amazon.com/general/latest/gr/rande.html">AWS Service endpoints</a>
|
||||||
|
* @see <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions">Available Regions</a>
|
||||||
|
*/
|
||||||
|
ENDPOINT {
|
||||||
|
void visit(final NestedPropertyProcessor processor, final String endpoint) {
|
||||||
|
final String[] tokens = endpoint.split("\\^");
|
||||||
|
if (tokens.length != 2) {
|
||||||
|
throw new IllegalArgumentException("Invalid " + name() + ": " + endpoint);
|
||||||
|
}
|
||||||
|
processor.acceptEndpoint(tokens[0], tokens[1]);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify the region where service requests will be submitted. This
|
||||||
|
* region will determine both the service endpoint and signing region.
|
||||||
|
* <br/><br/>
|
||||||
|
* It would be redundant to provide both this and {@link #ENDPOINT}.
|
||||||
|
*
|
||||||
|
* @see #ENDPOINT
|
||||||
|
* @see <a href="https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions">Available Regions</a>
|
||||||
|
*/
|
||||||
|
ENDPOINT_REGION {
|
||||||
|
void visit(final NestedPropertyProcessor processor, final String region) {
|
||||||
|
processor.acceptEndpointRegion(Regions.fromName(region));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* External ids may be used when delegating access in a multi-tenant
|
||||||
|
* environment, or to third parties.
|
||||||
|
*
|
||||||
|
* @see <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html">
|
||||||
|
* How to use an external ID when granting access to your AWS resources to a third party</a>
|
||||||
|
*/
|
||||||
|
EXTERNAL_ID {
|
||||||
|
void visit(final NestedPropertyProcessor processor, final String externalId) {
|
||||||
|
processor.acceptExternalId(externalId);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Nested key within the property value. For example, a nested key-value
|
||||||
|
* of {@code foo=bar} has a nested key of {@code foo}.
|
||||||
|
*/
|
||||||
|
@Getter(AccessLevel.PACKAGE)
|
||||||
|
private final String nestedKey;
|
||||||
|
|
||||||
|
NestedPropertyKey() {
|
||||||
|
// convert the enum from UPPER_SNAKE_CASE to lowerCamelCase
|
||||||
|
nestedKey = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract void visit(NestedPropertyProcessor processor, String value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses any number of parameters. Each nested property will prompt a
|
||||||
|
* visit to the {@code processor}.
|
||||||
|
*
|
||||||
|
* @param processor processor to be invoked for every nested property
|
||||||
|
* @param params parameters to check for a nested property key
|
||||||
|
*/
|
||||||
|
public static void parse(final NestedPropertyProcessor processor, final String... params) {
|
||||||
|
// Construct a disposable cache to keep this O(n). Since parsing is
|
||||||
|
// usually one-and-done, it's wasteful to maintain this cache in perpetuity.
|
||||||
|
final Map<String, NestedPropertyKey> cachedKeys = new HashMap<>();
|
||||||
|
for (final NestedPropertyKey npk : values()) {
|
||||||
|
cachedKeys.put(npk.getNestedKey(), npk);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final String param : params) {
|
||||||
|
if (param != null) {
|
||||||
|
final String[] tokens = param.split("=");
|
||||||
|
if (tokens.length == 2) {
|
||||||
|
final NestedPropertyKey npk = cachedKeys.get(tokens[0]);
|
||||||
|
if (npk != null) {
|
||||||
|
npk.visit(processor, tokens[1]);
|
||||||
|
} else {
|
||||||
|
log.warn("Unsupported nested key: {}", param);
|
||||||
|
}
|
||||||
|
} else if (tokens.length > 2) {
|
||||||
|
log.warn("Malformed nested key: {}", param);
|
||||||
|
} else {
|
||||||
|
log.info("Parameter is not a nested key: {}", param);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.multilang;
|
||||||
|
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines methods to process {@link NestedPropertyKey}s.
|
||||||
|
*/
|
||||||
|
public interface NestedPropertyProcessor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the service endpoint where requests are sent.
|
||||||
|
*
|
||||||
|
* @param serviceEndpoint the service endpoint either with or without the protocol
|
||||||
|
* (e.g., https://sns.us-west-1.amazonaws.com, sns.us-west-1.amazonaws.com)
|
||||||
|
* @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1)
|
||||||
|
*
|
||||||
|
* @see #acceptEndpointRegion(Regions)
|
||||||
|
* @see <a href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/client/builder/AwsClientBuilder.EndpointConfiguration.html">
|
||||||
|
* AwsClientBuilder.EndpointConfiguration</a>
|
||||||
|
*/
|
||||||
|
void acceptEndpoint(String serviceEndpoint, String signingRegion);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the service endpoint where requests are sent.
|
||||||
|
*
|
||||||
|
* @param region Region to be used by the client. This will be used to determine both the service endpoint
|
||||||
|
* (e.g., https://sns.us-west-1.amazonaws.com) and signing region (e.g., us-west-1) for requests.
|
||||||
|
*
|
||||||
|
* @see #acceptEndpoint(String, String)
|
||||||
|
*/
|
||||||
|
void acceptEndpointRegion(Regions region);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the external id, an optional field to designate who can assume an IAM role.
|
||||||
|
*
|
||||||
|
* @param externalId external id used in the service call used to retrieve session credentials
|
||||||
|
*/
|
||||||
|
void acceptExternalId(String externalId);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.multilang.auth;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import com.amazonaws.auth.AWSSessionCredentials;
|
||||||
|
import com.amazonaws.auth.AWSSessionCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.Builder;
|
||||||
|
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
|
||||||
|
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
|
||||||
|
|
||||||
|
import software.amazon.kinesis.multilang.NestedPropertyKey;
|
||||||
|
import software.amazon.kinesis.multilang.NestedPropertyProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link AWSSessionCredentialsProvider} that is backed by STSAssumeRole.
|
||||||
|
*/
|
||||||
|
public class KclSTSAssumeRoleSessionCredentialsProvider
|
||||||
|
implements AWSSessionCredentialsProvider, NestedPropertyProcessor {
|
||||||
|
|
||||||
|
private final Builder builder;
|
||||||
|
|
||||||
|
private final STSAssumeRoleSessionCredentialsProvider provider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param params vararg parameters which must include roleArn at index=0,
|
||||||
|
* and roleSessionName at index=1
|
||||||
|
*/
|
||||||
|
public KclSTSAssumeRoleSessionCredentialsProvider(final String[] params) {
|
||||||
|
this(params[0], params[1], Arrays.copyOfRange(params, 2, params.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
public KclSTSAssumeRoleSessionCredentialsProvider(final String roleArn, final String roleSessionName,
|
||||||
|
final String... params) {
|
||||||
|
builder = new Builder(roleArn, roleSessionName);
|
||||||
|
NestedPropertyKey.parse(this, params);
|
||||||
|
provider = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AWSSessionCredentials getCredentials() {
|
||||||
|
return provider.getCredentials();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void refresh() {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
|
||||||
|
final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion);
|
||||||
|
final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
|
||||||
|
.withEndpointConfiguration(endpoint)
|
||||||
|
.build();
|
||||||
|
builder.withStsClient(stsClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acceptEndpointRegion(final Regions region) {
|
||||||
|
final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
|
||||||
|
.withRegion(region)
|
||||||
|
.build();
|
||||||
|
builder.withStsClient(stsClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acceptExternalId(final String externalId) {
|
||||||
|
builder.withExternalId(externalId);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -14,10 +14,13 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.multilang.config;
|
package software.amazon.kinesis.multilang.config;
|
||||||
|
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||||
|
|
@ -28,7 +31,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder<AWSCredentialsProvider> {
|
class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder<AWSCredentialsProvider> {
|
||||||
private static final String AUTH_PREFIX = "com.amazonaws.auth.";
|
|
||||||
private static final String LIST_DELIMITER = ",";
|
private static final String LIST_DELIMITER = ",";
|
||||||
private static final String ARG_DELIMITER = "|";
|
private static final String ARG_DELIMITER = "|";
|
||||||
|
|
||||||
|
|
@ -63,34 +65,58 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Class<AWSCredentialsProvider>> getSupportedTypes() {
|
public List<Class<AWSCredentialsProvider>> getSupportedTypes() {
|
||||||
return Arrays.asList(AWSCredentialsProvider.class);
|
return Collections.singletonList(AWSCredentialsProvider.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Convert string list to a list of valid credentials providers.
|
* Convert string list to a list of valid credentials providers.
|
||||||
*/
|
*/
|
||||||
private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) {
|
private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) {
|
||||||
List<AWSCredentialsProvider> credentialsProviders = new ArrayList<>();
|
List<AWSCredentialsProvider> credentialsProviders = new ArrayList<>();
|
||||||
|
|
||||||
for (String providerName : providerNames) {
|
for (String providerName : providerNames) {
|
||||||
if (providerName.contains(ARG_DELIMITER)) {
|
final String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER);
|
||||||
String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER);
|
final Class<? extends AWSCredentialsProvider> clazz;
|
||||||
|
try {
|
||||||
|
final Class<?> c = Class.forName(nameAndArgs[0]);
|
||||||
|
if (!AWSCredentialsProvider.class.isAssignableFrom(c)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
clazz = (Class<? extends AWSCredentialsProvider>) c;
|
||||||
|
} catch (ClassNotFoundException cnfe) {
|
||||||
|
// Providers are a product of prefixed Strings to cover multiple
|
||||||
|
// namespaces (e.g., "Foo" -> { "some.auth.Foo", "kcl.auth.Foo" }).
|
||||||
|
// It's expected that many class names will not resolve.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
log.info("Attempting to construct {}", clazz);
|
||||||
|
|
||||||
|
AWSCredentialsProvider provider = null;
|
||||||
|
if (nameAndArgs.length > 1) {
|
||||||
|
final String[] varargs = Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length);
|
||||||
|
|
||||||
|
// attempt to invoke an explicit N-arg constructor of FooClass(String, String, ...)
|
||||||
|
provider = constructProvider(providerName, () -> {
|
||||||
Class<?>[] argTypes = new Class<?>[nameAndArgs.length - 1];
|
Class<?>[] argTypes = new Class<?>[nameAndArgs.length - 1];
|
||||||
Arrays.fill(argTypes, String.class);
|
Arrays.fill(argTypes, String.class);
|
||||||
try {
|
return clazz.getConstructor(argTypes).newInstance(varargs);
|
||||||
Class<?> className = Class.forName(nameAndArgs[0]);
|
});
|
||||||
Constructor<?> c = className.getConstructor(argTypes);
|
|
||||||
credentialsProviders.add((AWSCredentialsProvider) c
|
if (provider == null) {
|
||||||
.newInstance(Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length)));
|
// attempt to invoke a public varargs/array constructor of FooClass(String[])
|
||||||
} catch (Exception e) {
|
provider = constructProvider(providerName, () ->
|
||||||
log.debug("Can't find any credentials provider matching {}.", providerName);
|
clazz.getConstructor(String[].class).newInstance((Object) varargs)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Class<?> className = Class.forName(providerName);
|
|
||||||
credentialsProviders.add((AWSCredentialsProvider) className.newInstance());
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.debug("Can't find any credentials provider matching {}.", providerName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (provider == null) {
|
||||||
|
// regardless of parameters, fallback to invoke a public no-arg constructor
|
||||||
|
provider = constructProvider(providerName, clazz::newInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (provider != null) {
|
||||||
|
credentialsProviders.add(provider);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return credentialsProviders;
|
return credentialsProviders;
|
||||||
|
|
@ -99,7 +125,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
|
||||||
private static List<String> getProviderNames(String property) {
|
private static List<String> getProviderNames(String property) {
|
||||||
// assume list delimiter is ","
|
// assume list delimiter is ","
|
||||||
String[] elements = property.split(LIST_DELIMITER);
|
String[] elements = property.split(LIST_DELIMITER);
|
||||||
List<String> result = new ArrayList<String>();
|
List<String> result = new ArrayList<>();
|
||||||
for (int i = 0; i < elements.length; i++) {
|
for (int i = 0; i < elements.length; i++) {
|
||||||
String string = elements[i].trim();
|
String string = elements[i].trim();
|
||||||
if (!string.isEmpty()) {
|
if (!string.isEmpty()) {
|
||||||
|
|
@ -110,20 +136,48 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<String> getPossibleFullClassNames(String s) {
|
private static List<String> getPossibleFullClassNames(final String provider) {
|
||||||
/*
|
return Stream.of(
|
||||||
* We take care of three cases :
|
// Customer provides a short name of common providers in com.amazonaws.auth package
|
||||||
*
|
// (e.g., any classes implementing the AWSCredentialsProvider interface)
|
||||||
* 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes
|
// @see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
|
||||||
* implementing the AWSCredentialsProvider interface:
|
"com.amazonaws.auth.",
|
||||||
* http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
|
|
||||||
*
|
|
||||||
* 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider
|
|
||||||
*
|
|
||||||
* 3. Customer provides a custom credentials provider with full name of provider
|
|
||||||
*/
|
|
||||||
|
|
||||||
return Arrays.asList(s, AUTH_PREFIX + s);
|
// Customer provides a short name of a provider offered by this multi-lang package
|
||||||
|
"software.amazon.kinesis.multilang.auth.",
|
||||||
|
|
||||||
|
// Customer provides a fully-qualified provider name, or a custom credentials provider
|
||||||
|
// (e.g., com.amazonaws.auth.ClasspathFileCredentialsProvider, org.mycompany.FooProvider)
|
||||||
|
""
|
||||||
|
).map(prefix -> prefix + provider).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
private interface CredentialsProviderConstructor<T extends AWSCredentialsProvider> {
|
||||||
|
T construct() throws IllegalAccessException, InstantiationException,
|
||||||
|
InvocationTargetException, NoSuchMethodException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to construct an {@link AWSCredentialsProvider}.
|
||||||
|
*
|
||||||
|
* @param providerName Raw, unmodified provider name. Should there be an
|
||||||
|
* Exeception during construction, this parameter will be logged.
|
||||||
|
* @param constructor supplier-like function that will perform the construction
|
||||||
|
* @return the constructed provider, if successful; otherwise, null
|
||||||
|
*
|
||||||
|
* @param <T> type of the CredentialsProvider to construct
|
||||||
|
*/
|
||||||
|
private static <T extends AWSCredentialsProvider> T constructProvider(
|
||||||
|
final String providerName, final CredentialsProviderConstructor<T> constructor) {
|
||||||
|
try {
|
||||||
|
return constructor.construct();
|
||||||
|
} catch (NoSuchMethodException ignored) {
|
||||||
|
// ignore
|
||||||
|
} catch (IllegalAccessException | InstantiationException | InvocationTargetException | RuntimeException e) {
|
||||||
|
log.warn("Failed to construct {}", providerName, e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,10 +57,10 @@ public class BuilderDynaBean implements DynaBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
public BuilderDynaBean(Class<?> destinedClass, ConvertUtilsBean convertUtilsBean,
|
public BuilderDynaBean(Class<?> destinedClass, ConvertUtilsBean convertUtilsBean,
|
||||||
Function<String, ?> emtpyPropertyHandler, List<String> classPrefixSearchList) {
|
Function<String, ?> emptyPropertyHandler, List<String> classPrefixSearchList) {
|
||||||
this.convertUtilsBean = convertUtilsBean;
|
this.convertUtilsBean = convertUtilsBean;
|
||||||
this.classPrefixSearchList = classPrefixSearchList;
|
this.classPrefixSearchList = classPrefixSearchList;
|
||||||
this.emptyPropertyHandler = emtpyPropertyHandler;
|
this.emptyPropertyHandler = emptyPropertyHandler;
|
||||||
initialize(destinedClass);
|
initialize(destinedClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,7 @@ public class KinesisClientLibConfigurator {
|
||||||
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
||||||
properties.entrySet().forEach(e -> {
|
properties.entrySet().forEach(e -> {
|
||||||
try {
|
try {
|
||||||
|
log.info("Processing (key={}, value={})", e.getKey(), e.getValue());
|
||||||
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
|
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
|
||||||
} catch (IllegalAccessException | InvocationTargetException ex) {
|
} catch (IllegalAccessException | InvocationTargetException ex) {
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class MultiLangDaemonConfigTest {
|
public class MultiLangDaemonConfigTest {
|
||||||
private static final String FILENAME = "some.properties";
|
private static final String FILENAME = "multilang.properties";
|
||||||
private static final String EXE = "TestExe.exe";
|
private static final String EXE = "TestExe.exe";
|
||||||
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
|
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
|
||||||
private static final String STREAM_NAME = "fakeStream";
|
private static final String STREAM_NAME = "fakeStream";
|
||||||
|
|
@ -52,7 +52,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
@Mock
|
@Mock
|
||||||
private AwsCredentials creds;
|
private AwsCredentials creds;
|
||||||
|
|
||||||
private KinesisClientLibConfigurator configurator;
|
private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
|
||||||
private MultiLangDaemonConfig deamonConfig;
|
private MultiLangDaemonConfig deamonConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -62,7 +62,6 @@ public class MultiLangDaemonConfigTest {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void setup(String streamName, String streamArn) throws IOException {
|
public void setup(String streamName, String streamArn) throws IOException {
|
||||||
|
|
||||||
String properties = String.format("executableName = %s\n"
|
String properties = String.format("executableName = %s\n"
|
||||||
+ "applicationName = %s\n"
|
+ "applicationName = %s\n"
|
||||||
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
||||||
|
|
@ -85,8 +84,6 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
||||||
when(creds.accessKeyId()).thenReturn("cool-user");
|
when(creds.accessKeyId()).thenReturn("cool-user");
|
||||||
configurator = new KinesisClientLibConfigurator();
|
|
||||||
|
|
||||||
deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -201,4 +198,14 @@ public class MultiLangDaemonConfigTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the loading of a "real" properties file. This test should catch
|
||||||
|
* any issues which might arise if there is a discrepancy between reality
|
||||||
|
* and mocking.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testActualPropertiesFile() throws Exception {
|
||||||
|
new MultiLangDaemonConfig(FILENAME);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.multilang;
|
||||||
|
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT;
|
||||||
|
import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT_REGION;
|
||||||
|
import static software.amazon.kinesis.multilang.NestedPropertyKey.EXTERNAL_ID;
|
||||||
|
import static software.amazon.kinesis.multilang.NestedPropertyKey.parse;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class NestedPropertyKeyTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private NestedPropertyProcessor mockProcessor;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExternalId() {
|
||||||
|
final String expectedId = "eid";
|
||||||
|
|
||||||
|
parse(mockProcessor, createKey(EXTERNAL_ID, expectedId));
|
||||||
|
verify(mockProcessor).acceptExternalId(expectedId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEndpoint() {
|
||||||
|
final String expectedEndpoint = "https://sts.us-east-1.amazonaws.com";
|
||||||
|
final String expectedRegion = "us-east-1";
|
||||||
|
final String param = createKey(ENDPOINT, expectedEndpoint + "^" + expectedRegion);
|
||||||
|
|
||||||
|
parse(mockProcessor, param);
|
||||||
|
verify(mockProcessor).acceptEndpoint(expectedEndpoint, expectedRegion);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testInvalidEndpoint() {
|
||||||
|
parse(mockProcessor, createKey(ENDPOINT, "value-sans-caret-delimiter"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testInvalidEndpointDoubleCaret() {
|
||||||
|
parse(mockProcessor, createKey(ENDPOINT, "https://sts.us-east-1.amazonaws.com^us-east-1^borkbork"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEndpointRegion() {
|
||||||
|
final Regions expectedRegion = Regions.GovCloud;
|
||||||
|
|
||||||
|
parse(mockProcessor, createKey(ENDPOINT_REGION, expectedRegion.getName()));
|
||||||
|
verify(mockProcessor).acceptEndpointRegion(expectedRegion);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testInvalidEndpointRegion() {
|
||||||
|
parse(mockProcessor, createKey(ENDPOINT_REGION, "snuffleupagus"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the literal nested key (i.e., {@code key=} in {@code some_val|key=nested_val})
|
||||||
|
* does not change. Any change to an existing literal key is not backwards-compatible.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKeysExplicitly() {
|
||||||
|
// Adding a new enum will deliberately cause this assert to fail, and
|
||||||
|
// therefore raise awareness for this explicit test. Add-and-remove may
|
||||||
|
// keep the number unchanged yet will also break (by removing an enum).
|
||||||
|
assertEquals(3, NestedPropertyKey.values().length);
|
||||||
|
|
||||||
|
assertEquals("endpoint", ENDPOINT.getNestedKey());
|
||||||
|
assertEquals("endpointRegion", ENDPOINT_REGION.getNestedKey());
|
||||||
|
assertEquals("externalId", EXTERNAL_ID.getNestedKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonmatchingParameters() {
|
||||||
|
final String[] params = new String[] {
|
||||||
|
null,
|
||||||
|
"",
|
||||||
|
"hello world", // no nested key
|
||||||
|
"foo=bar", // nested key, but is not a recognized key
|
||||||
|
createKey(EXTERNAL_ID, "eid") + "=extra", // valid key made invalid by second '='
|
||||||
|
};
|
||||||
|
parse(mockProcessor, params);
|
||||||
|
verifyZeroInteractions(mockProcessor);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String createKey(final NestedPropertyKey key, final String value) {
|
||||||
|
return key.getNestedKey() + "=" + value;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 Amazon.com, Inc. or its affiliates.
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package software.amazon.kinesis.multilang.auth;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class KclSTSAssumeRoleSessionCredentialsProviderTest {
|
||||||
|
|
||||||
|
private static final String ARN = "arn";
|
||||||
|
private static final String SESSION_NAME = "sessionName";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the constructor doesn't throw an out-of-bounds exception if
|
||||||
|
* there are no parameters beyond the required ARN and session name.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testConstructorWithoutOptionalParams() {
|
||||||
|
new KclSTSAssumeRoleSessionCredentialsProvider(new String[] { ARN, SESSION_NAME });
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcceptEndpoint() {
|
||||||
|
// discovered exception during e2e testing; therefore, this test is
|
||||||
|
// to simply verify the constructed STS client doesn't go *boom*
|
||||||
|
final KclSTSAssumeRoleSessionCredentialsProvider provider =
|
||||||
|
new KclSTSAssumeRoleSessionCredentialsProvider(ARN, SESSION_NAME);
|
||||||
|
provider.acceptEndpoint("endpoint", "us-east-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVarArgs() {
|
||||||
|
for (final String[] varargs : Arrays.asList(
|
||||||
|
new String[] { ARN, SESSION_NAME, "externalId=eid", "foo"},
|
||||||
|
new String[] { ARN, SESSION_NAME, "foo", "externalId=eid"}
|
||||||
|
)) {
|
||||||
|
final VarArgsSpy provider = new VarArgsSpy(varargs);
|
||||||
|
assertEquals("eid", provider.externalId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class VarArgsSpy extends KclSTSAssumeRoleSessionCredentialsProvider {
|
||||||
|
|
||||||
|
private String externalId;
|
||||||
|
|
||||||
|
public VarArgsSpy(String[] args) {
|
||||||
|
super(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acceptExternalId(final String externalId) {
|
||||||
|
this.externalId = externalId;
|
||||||
|
super.acceptExternalId(externalId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -16,6 +16,8 @@ package software.amazon.kinesis.multilang.config;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -26,6 +28,7 @@ import org.hamcrest.Description;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
import org.hamcrest.TypeSafeDiagnosingMatcher;
|
import org.hamcrest.TypeSafeDiagnosingMatcher;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import software.amazon.kinesis.multilang.auth.KclSTSAssumeRoleSessionCredentialsProvider;
|
||||||
|
|
||||||
import com.amazonaws.auth.AWSCredentials;
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
|
@ -38,7 +41,7 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
||||||
|
|
||||||
private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
|
private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName();
|
||||||
private final String credentialName2 = ConstructorCredentialsProvider.class.getName();
|
private final String credentialName2 = ConstructorCredentialsProvider.class.getName();
|
||||||
private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
|
private final AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
|
||||||
|
|
||||||
@ToString
|
@ToString
|
||||||
private static class AWSCredentialsMatcher extends TypeSafeDiagnosingMatcher<AWSCredentialsProvider> {
|
private static class AWSCredentialsMatcher extends TypeSafeDiagnosingMatcher<AWSCredentialsProvider> {
|
||||||
|
|
@ -53,10 +56,6 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
||||||
this.classMatcher = instanceOf(AWSCredentialsProviderChain.class);
|
this.classMatcher = instanceOf(AWSCredentialsProviderChain.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private AWSCredentialsMatcher(AWSCredentials expected) {
|
|
||||||
this(expected.getAWSAccessKeyId(), expected.getAWSSecretKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean matchesSafely(AWSCredentialsProvider item, Description mismatchDescription) {
|
protected boolean matchesSafely(AWSCredentialsProvider item, Description mismatchDescription) {
|
||||||
AWSCredentials actual = item.getCredentials();
|
AWSCredentials actual = item.getCredentials();
|
||||||
|
|
@ -114,6 +113,33 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
||||||
assertThat(provider, hasCredentials("arg1", "arg2"));
|
assertThat(provider, hasCredentials("arg1", "arg2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that providers in the multi-lang auth package can be resolved and instantiated.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKclAuthProvider() {
|
||||||
|
for (final String className : Arrays.asList(
|
||||||
|
KclSTSAssumeRoleSessionCredentialsProvider.class.getName(), // fully-qualified name
|
||||||
|
KclSTSAssumeRoleSessionCredentialsProvider.class.getSimpleName() // name-only; needs prefix
|
||||||
|
)) {
|
||||||
|
final AWSCredentialsProvider provider = decoder.decodeValue(className + "|arn|sessionName");
|
||||||
|
assertNotNull(className, provider);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a provider can be instantiated by its varargs constructor.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testVarArgAuthProvider() {
|
||||||
|
final String[] args = new String[] { "arg1", "arg2", "arg3" };
|
||||||
|
final String className = VarArgCredentialsProvider.class.getName();
|
||||||
|
final String encodedValue = className + "|" + String.join("|", args);
|
||||||
|
|
||||||
|
final AWSCredentialsProvider provider = decoder.decodeValue(encodedValue);
|
||||||
|
assertEquals(Arrays.toString(args), provider.getCredentials().getAWSAccessKeyId());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This credentials provider will always succeed
|
* This credentials provider will always succeed
|
||||||
*/
|
*/
|
||||||
|
|
@ -138,9 +164,9 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
||||||
private String arg1;
|
private String arg1;
|
||||||
private String arg2;
|
private String arg2;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public ConstructorCredentialsProvider(String arg1) {
|
public ConstructorCredentialsProvider(String arg1) {
|
||||||
this.arg1 = arg1;
|
this(arg1, "blank");
|
||||||
this.arg2 = "blank";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConstructorCredentialsProvider(String arg1, String arg2) {
|
public ConstructorCredentialsProvider(String arg1, String arg2) {
|
||||||
|
|
@ -158,4 +184,25 @@ public class AWSCredentialsProviderPropertyValueDecoderTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class VarArgCredentialsProvider implements AWSCredentialsProvider {
|
||||||
|
|
||||||
|
private final String[] args;
|
||||||
|
|
||||||
|
public VarArgCredentialsProvider(final String[] args) {
|
||||||
|
this.args = args;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AWSCredentials getCredentials() {
|
||||||
|
// KISS solution to surface the constructor args
|
||||||
|
final String flattenedArgs = Arrays.toString(args);
|
||||||
|
return new BasicAWSCredentials(flattenedArgs, flattenedArgs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void refresh() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,93 @@
|
||||||
|
# The script that abides by the multi-language protocol. This script will
|
||||||
|
# be executed by the MultiLangDaemon, which will communicate with this script
|
||||||
|
# over STDIN and STDOUT according to the multi-language protocol.
|
||||||
|
executableName = sample_kclpy_app.py
|
||||||
|
|
||||||
|
# The Stream arn: arn:aws:kinesis:<region>:<account id>:stream/<stream name>
|
||||||
|
# Important: streamArn takes precedence over streamName if both are set
|
||||||
|
streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample
|
||||||
|
|
||||||
|
# The name of an Amazon Kinesis stream to process.
|
||||||
|
# Important: streamArn takes precedence over streamName if both are set
|
||||||
|
streamName = kclpysample
|
||||||
|
|
||||||
|
# Used by the KCL as the name of this application. Will be used as the name
|
||||||
|
# of an Amazon DynamoDB table which will store the lease and checkpoint
|
||||||
|
# information for workers with this application name
|
||||||
|
applicationName = MultiLangTest
|
||||||
|
|
||||||
|
# Users can change the credentials provider the KCL will use to retrieve credentials.
|
||||||
|
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
|
||||||
|
# described here:
|
||||||
|
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
|
||||||
|
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
|
||||||
|
|
||||||
|
# Appended to the user agent of the KCL. Does not impact the functionality of the
|
||||||
|
# KCL in any other way.
|
||||||
|
processingLanguage = python/3.8
|
||||||
|
|
||||||
|
# Valid options at TRIM_HORIZON or LATEST.
|
||||||
|
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
|
||||||
|
initialPositionInStream = TRIM_HORIZON
|
||||||
|
|
||||||
|
# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended',
|
||||||
|
# and uncomment below line with right timestamp value.
|
||||||
|
# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
|
||||||
|
#initialPositionInStreamExtended = 1636609142
|
||||||
|
|
||||||
|
# The following properties are also available for configuring the KCL Worker that is created
|
||||||
|
# by the MultiLangDaemon.
|
||||||
|
|
||||||
|
# The KCL defaults to us-east-1
|
||||||
|
regionName = us-east-1
|
||||||
|
|
||||||
|
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
|
||||||
|
# will be regarded as having problems and it's shards will be assigned to other workers.
|
||||||
|
# For applications that have a large number of shards, this msy be set to a higher number to reduce
|
||||||
|
# the number of DynamoDB IOPS required for tracking leases
|
||||||
|
failoverTimeMillis = 10000
|
||||||
|
|
||||||
|
# A worker id that uniquely identifies this worker among all workers using the same applicationName
|
||||||
|
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
|
||||||
|
workerId = "workerId"
|
||||||
|
|
||||||
|
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
|
||||||
|
shardSyncIntervalMillis = 60000
|
||||||
|
|
||||||
|
# Max records to fetch from Kinesis in a single GetRecords call.
|
||||||
|
maxRecords = 10000
|
||||||
|
|
||||||
|
# Idle time between record reads in milliseconds.
|
||||||
|
idleTimeBetweenReadsInMillis = 1000
|
||||||
|
|
||||||
|
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
|
||||||
|
callProcessRecordsEvenForEmptyRecordList = false
|
||||||
|
|
||||||
|
# Interval in milliseconds between polling to check for parent shard completion.
|
||||||
|
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
|
||||||
|
# completion of parent shards).
|
||||||
|
parentShardPollIntervalMillis = 10000
|
||||||
|
|
||||||
|
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
|
||||||
|
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
|
||||||
|
# to delete the ones we don't need any longer.
|
||||||
|
cleanupLeasesUponShardCompletion = true
|
||||||
|
|
||||||
|
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
||||||
|
taskBackoffTimeMillis = 500
|
||||||
|
|
||||||
|
# Buffer metrics for at most this long before publishing to CloudWatch.
|
||||||
|
metricsBufferTimeMillis = 10000
|
||||||
|
|
||||||
|
# Buffer at most this many metrics before publishing to CloudWatch.
|
||||||
|
metricsMaxQueueSize = 10000
|
||||||
|
|
||||||
|
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
|
||||||
|
# to RecordProcessorCheckpointer#checkpoint(String) by default.
|
||||||
|
validateSequenceNumberBeforeCheckpointing = true
|
||||||
|
|
||||||
|
# The maximum number of active threads for the MultiLangDaemon to permit.
|
||||||
|
# If a value is provided then a FixedThreadPool is used with the maximum
|
||||||
|
# active threads set to the provided value. If a non-positive integer or no
|
||||||
|
# value is provided a CachedThreadPool is used.
|
||||||
|
maxActiveThreads = -1
|
||||||
Loading…
Reference in a new issue