From 2f4ff65681468dc6a51f55852a983d049cc4aeec Mon Sep 17 00:00:00 2001 From: stair <123031771+stair-aws@users.noreply.github.com> Date: Mon, 7 Aug 2023 16:29:49 -0400 Subject: [PATCH] =?UTF-8?q?[#367]=20Enhanced=20multi-lang=20`AWSCredential?= =?UTF-8?q?sProvider=3D...`=20decoder=20and=20c=E2=80=A6=20(#1184)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [#367] Enhanced multi-lang `AWSCredentialsProvider=...` decoder and construction. + added support for external ids (issue #367) + added support for endpoint+region (e.g., STS via VPC) * Multiple multi-lang edits to introduce logging and additional tests. + added `ENDPOINT_REGION` nested key for a simpler Cx experience + deduplicated, and improved, logic w.r.t. CredentialsProvider construction to NOT swallow Exceptions * Relocated `multilang.properties` from `main/resources` to `test/resources` --- .../multilang/MultiLangDaemonConfig.java | 11 +- .../MultiLangShardRecordProcessor.java | 18 +-- .../kinesis/multilang/NestedPropertyKey.java | 145 ++++++++++++++++++ .../multilang/NestedPropertyProcessor.java | 54 +++++++ ...SAssumeRoleSessionCredentialsProvider.java | 89 +++++++++++ ...edentialsProviderPropertyValueDecoder.java | 124 ++++++++++----- .../multilang/config/BuilderDynaBean.java | 4 +- .../config/KinesisClientLibConfigurator.java | 1 + .../multilang/MultiLangDaemonConfigTest.java | 17 +- .../multilang/NestedPropertyKeyTest.java | 112 ++++++++++++++ ...umeRoleSessionCredentialsProviderTest.java | 71 +++++++++ ...tialsProviderPropertyValueDecoderTest.java | 61 +++++++- .../src/test/resources/multilang.properties | 93 +++++++++++ 13 files changed, 734 insertions(+), 66 deletions(-) create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java create mode 100644 amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java create mode 100644 amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java create mode 100644 amazon-kinesis-client-multilang/src/test/resources/multilang.properties diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java index 4d3a408f..c7f77c19 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangDaemonConfig.java @@ -45,11 +45,11 @@ public class MultiLangDaemonConfig { private static final String PROP_PROCESSING_LANGUAGE = "processingLanguage"; private static final String PROP_MAX_ACTIVE_THREADS = "maxActiveThreads"; - private MultiLangDaemonConfiguration multiLangDaemonConfiguration; + private final MultiLangDaemonConfiguration multiLangDaemonConfiguration; - private ExecutorService executorService; + private final ExecutorService executorService; - private MultiLangRecordProcessorFactory recordProcessorFactory; + private final MultiLangRecordProcessorFactory recordProcessorFactory; /** * Constructor. @@ -165,7 +165,6 @@ public class MultiLangDaemonConfig { propertyStream.close(); } } - } private static boolean validateProperties(Properties properties) { @@ -182,12 +181,12 @@ public class MultiLangDaemonConfig { log.debug("Value for {} property is {}", PROP_MAX_ACTIVE_THREADS, maxActiveThreads); if (maxActiveThreads <= 0) { log.info("Using a cached thread pool."); - return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), builder.build()); } else { log.info("Using a fixed thread pool with {} max active threads.", maxActiveThreads); return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), builder.build()); + new LinkedBlockingQueue<>(), builder.build()); } } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java index 7b0eefe2..241ea8ee 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangShardRecordProcessor.java @@ -32,7 +32,6 @@ import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; import software.amazon.kinesis.processor.ShardRecordProcessor; - /** * A record processor that manages creating a child process that implements the multi language protocol and connecting * that child process's input and outputs to a {@link MultiLangProtocol} object and calling the appropriate methods on @@ -50,20 +49,20 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor { private Future stderrReadTask; - private MessageWriter messageWriter; - private MessageReader messageReader; - private DrainChildSTDERRTask readSTDERRTask; + private final MessageWriter messageWriter; + private final MessageReader messageReader; + private final DrainChildSTDERRTask readSTDERRTask; - private ProcessBuilder processBuilder; + private final ProcessBuilder processBuilder; private Process process; - private ExecutorService executorService; + private final ExecutorService executorService; private ProcessState state; - private ObjectMapper objectMapper; + private final ObjectMapper objectMapper; private MultiLangProtocol protocol; - private MultiLangDaemonConfiguration configuration; + private final MultiLangDaemonConfiguration configuration; @Override public void initialize(InitializationInput initializationInput) { @@ -213,7 +212,6 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor { this.readSTDERRTask = readSTDERRTask; this.configuration = configuration; - this.state = ProcessState.ACTIVE; } @@ -303,8 +301,6 @@ public class MultiLangShardRecordProcessor implements ShardRecordProcessor { /** * We provide a package level method for unit testing this call to exit. - * - * @param val exit value */ void exit() { System.exit(EXIT_VALUE); diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java new file mode 100644 index 00000000..ea3db8c3 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java @@ -0,0 +1,145 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.multilang; + +import java.util.HashMap; +import java.util.Map; + +import com.amazonaws.regions.Regions; +import com.google.common.base.CaseFormat; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Key-Value pairs which may be nested in, and extracted from, a property value + * in a Java properties file. For example, given the line in a property file of + * {@code my_key = my_value|foo=bar} and a delimiter split on {@code |} (pipe), + * the value {@code my_value|foo=bar} would have a nested key of {@code foo} + * and its corresponding value is {@code bar}. + *

+ * The order of nested properties does not matter, and these properties are optional. + * Customers may choose to provide, in any order, zero-or-more nested properties. + *

+ * Duplicate keys are not supported, and may result in a last-write-wins outcome. + */ +@Slf4j +public enum NestedPropertyKey { + + /** + * Specify the service endpoint where requests will be submitted. + * This property's value must be in the following format: + *
+     *     ENDPOINT ::= SERVICE_ENDPOINT "^" SIGNING_REGION
+     *     SERVICE_ENDPOINT ::= URL
+     *     SIGNING_REGION ::= AWS_REGION
+     * 
+ * + * It would be redundant to provide both this and {@link #ENDPOINT_REGION}. + * + * @see #ENDPOINT_REGION + * @see AWS Service endpoints + * @see Available Regions + */ + ENDPOINT { + void visit(final NestedPropertyProcessor processor, final String endpoint) { + final String[] tokens = endpoint.split("\\^"); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid " + name() + ": " + endpoint); + } + processor.acceptEndpoint(tokens[0], tokens[1]); + } + }, + + /** + * Specify the region where service requests will be submitted. This + * region will determine both the service endpoint and signing region. + *

+ * It would be redundant to provide both this and {@link #ENDPOINT}. + * + * @see #ENDPOINT + * @see Available Regions + */ + ENDPOINT_REGION { + void visit(final NestedPropertyProcessor processor, final String region) { + processor.acceptEndpointRegion(Regions.fromName(region)); + } + }, + + /** + * External ids may be used when delegating access in a multi-tenant + * environment, or to third parties. + * + * @see + * How to use an external ID when granting access to your AWS resources to a third party + */ + EXTERNAL_ID { + void visit(final NestedPropertyProcessor processor, final String externalId) { + processor.acceptExternalId(externalId); + } + }, + + ; + + /** + * Nested key within the property value. For example, a nested key-value + * of {@code foo=bar} has a nested key of {@code foo}. + */ + @Getter(AccessLevel.PACKAGE) + private final String nestedKey; + + NestedPropertyKey() { + // convert the enum from UPPER_SNAKE_CASE to lowerCamelCase + nestedKey = CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name()); + } + + abstract void visit(NestedPropertyProcessor processor, String value); + + /** + * Parses any number of parameters. Each nested property will prompt a + * visit to the {@code processor}. + * + * @param processor processor to be invoked for every nested property + * @param params parameters to check for a nested property key + */ + public static void parse(final NestedPropertyProcessor processor, final String... params) { + // Construct a disposable cache to keep this O(n). Since parsing is + // usually one-and-done, it's wasteful to maintain this cache in perpetuity. + final Map cachedKeys = new HashMap<>(); + for (final NestedPropertyKey npk : values()) { + cachedKeys.put(npk.getNestedKey(), npk); + } + + for (final String param : params) { + if (param != null) { + final String[] tokens = param.split("="); + if (tokens.length == 2) { + final NestedPropertyKey npk = cachedKeys.get(tokens[0]); + if (npk != null) { + npk.visit(processor, tokens[1]); + } else { + log.warn("Unsupported nested key: {}", param); + } + } else if (tokens.length > 2) { + log.warn("Malformed nested key: {}", param); + } else { + log.info("Parameter is not a nested key: {}", param); + } + } + } + } + +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java new file mode 100644 index 00000000..d3dd7a6f --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.multilang; + +import com.amazonaws.regions.Regions; + +/** + * Defines methods to process {@link NestedPropertyKey}s. + */ +public interface NestedPropertyProcessor { + + /** + * Set the service endpoint where requests are sent. + * + * @param serviceEndpoint the service endpoint either with or without the protocol + * (e.g., https://sns.us-west-1.amazonaws.com, sns.us-west-1.amazonaws.com) + * @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1) + * + * @see #acceptEndpointRegion(Regions) + * @see + * AwsClientBuilder.EndpointConfiguration + */ + void acceptEndpoint(String serviceEndpoint, String signingRegion); + + /** + * Set the service endpoint where requests are sent. + * + * @param region Region to be used by the client. This will be used to determine both the service endpoint + * (e.g., https://sns.us-west-1.amazonaws.com) and signing region (e.g., us-west-1) for requests. + * + * @see #acceptEndpoint(String, String) + */ + void acceptEndpointRegion(Regions region); + + /** + * Set the external id, an optional field to designate who can assume an IAM role. + * + * @param externalId external id used in the service call used to retrieve session credentials + */ + void acceptExternalId(String externalId); + +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java new file mode 100644 index 00000000..3b196b94 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.multilang.auth; + +import java.util.Arrays; + +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.AWSSessionCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.Builder; +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient; + +import software.amazon.kinesis.multilang.NestedPropertyKey; +import software.amazon.kinesis.multilang.NestedPropertyProcessor; + +/** + * An {@link AWSSessionCredentialsProvider} that is backed by STSAssumeRole. + */ +public class KclSTSAssumeRoleSessionCredentialsProvider + implements AWSSessionCredentialsProvider, NestedPropertyProcessor { + + private final Builder builder; + + private final STSAssumeRoleSessionCredentialsProvider provider; + + /** + * + * @param params vararg parameters which must include roleArn at index=0, + * and roleSessionName at index=1 + */ + public KclSTSAssumeRoleSessionCredentialsProvider(final String[] params) { + this(params[0], params[1], Arrays.copyOfRange(params, 2, params.length)); + } + + public KclSTSAssumeRoleSessionCredentialsProvider(final String roleArn, final String roleSessionName, + final String... params) { + builder = new Builder(roleArn, roleSessionName); + NestedPropertyKey.parse(this, params); + provider = builder.build(); + } + + @Override + public AWSSessionCredentials getCredentials() { + return provider.getCredentials(); + } + + @Override + public void refresh() { + // do nothing + } + + @Override + public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) { + final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion); + final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder() + .withEndpointConfiguration(endpoint) + .build(); + builder.withStsClient(stsClient); + } + + @Override + public void acceptEndpointRegion(final Regions region) { + final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder() + .withRegion(region) + .build(); + builder.withStsClient(stsClient); + } + + @Override + public void acceptExternalId(final String externalId) { + builder.withExternalId(externalId); + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java index 97fa975e..f11ac0ec 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java @@ -14,10 +14,13 @@ */ package software.amazon.kinesis.multilang.config; -import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProviderChain; @@ -28,7 +31,6 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecoder { - private static final String AUTH_PREFIX = "com.amazonaws.auth."; private static final String LIST_DELIMITER = ","; private static final String ARG_DELIMITER = "|"; @@ -63,35 +65,59 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode */ @Override public List> getSupportedTypes() { - return Arrays.asList(AWSCredentialsProvider.class); + return Collections.singletonList(AWSCredentialsProvider.class); } - /* + /** * Convert string list to a list of valid credentials providers. */ private static List getValidCredentialsProviders(List providerNames) { List credentialsProviders = new ArrayList<>(); + for (String providerName : providerNames) { - if (providerName.contains(ARG_DELIMITER)) { - String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER); - Class[] argTypes = new Class[nameAndArgs.length - 1]; - Arrays.fill(argTypes, String.class); - try { - Class className = Class.forName(nameAndArgs[0]); - Constructor c = className.getConstructor(argTypes); - credentialsProviders.add((AWSCredentialsProvider) c - .newInstance(Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length))); - } catch (Exception e) { - log.debug("Can't find any credentials provider matching {}.", providerName); + final String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER); + final Class clazz; + try { + final Class c = Class.forName(nameAndArgs[0]); + if (!AWSCredentialsProvider.class.isAssignableFrom(c)) { + continue; } - } else { - try { - Class className = Class.forName(providerName); - credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); - } catch (Exception e) { - log.debug("Can't find any credentials provider matching {}.", providerName); + clazz = (Class) c; + } catch (ClassNotFoundException cnfe) { + // Providers are a product of prefixed Strings to cover multiple + // namespaces (e.g., "Foo" -> { "some.auth.Foo", "kcl.auth.Foo" }). + // It's expected that many class names will not resolve. + continue; + } + log.info("Attempting to construct {}", clazz); + + AWSCredentialsProvider provider = null; + if (nameAndArgs.length > 1) { + final String[] varargs = Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length); + + // attempt to invoke an explicit N-arg constructor of FooClass(String, String, ...) + provider = constructProvider(providerName, () -> { + Class[] argTypes = new Class[nameAndArgs.length - 1]; + Arrays.fill(argTypes, String.class); + return clazz.getConstructor(argTypes).newInstance(varargs); + }); + + if (provider == null) { + // attempt to invoke a public varargs/array constructor of FooClass(String[]) + provider = constructProvider(providerName, () -> + clazz.getConstructor(String[].class).newInstance((Object) varargs) + ); } } + + if (provider == null) { + // regardless of parameters, fallback to invoke a public no-arg constructor + provider = constructProvider(providerName, clazz::newInstance); + } + + if (provider != null) { + credentialsProviders.add(provider); + } } return credentialsProviders; } @@ -99,7 +125,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getProviderNames(String property) { // assume list delimiter is "," String[] elements = property.split(LIST_DELIMITER); - List result = new ArrayList(); + List result = new ArrayList<>(); for (int i = 0; i < elements.length; i++) { String string = elements[i].trim(); if (!string.isEmpty()) { @@ -110,20 +136,48 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode return result; } - private static List getPossibleFullClassNames(String s) { - /* - * We take care of three cases : - * - * 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes - * implementing the AWSCredentialsProvider interface: - * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html - * - * 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider - * - * 3. Customer provides a custom credentials provider with full name of provider - */ + private static List getPossibleFullClassNames(final String provider) { + return Stream.of( + // Customer provides a short name of common providers in com.amazonaws.auth package + // (e.g., any classes implementing the AWSCredentialsProvider interface) + // @see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html + "com.amazonaws.auth.", - return Arrays.asList(s, AUTH_PREFIX + s); + // Customer provides a short name of a provider offered by this multi-lang package + "software.amazon.kinesis.multilang.auth.", + + // Customer provides a fully-qualified provider name, or a custom credentials provider + // (e.g., com.amazonaws.auth.ClasspathFileCredentialsProvider, org.mycompany.FooProvider) + "" + ).map(prefix -> prefix + provider).collect(Collectors.toList()); + } + + @FunctionalInterface + private interface CredentialsProviderConstructor { + T construct() throws IllegalAccessException, InstantiationException, + InvocationTargetException, NoSuchMethodException; + } + + /** + * Attempts to construct an {@link AWSCredentialsProvider}. + * + * @param providerName Raw, unmodified provider name. Should there be an + * Exeception during construction, this parameter will be logged. + * @param constructor supplier-like function that will perform the construction + * @return the constructed provider, if successful; otherwise, null + * + * @param type of the CredentialsProvider to construct + */ + private static T constructProvider( + final String providerName, final CredentialsProviderConstructor constructor) { + try { + return constructor.construct(); + } catch (NoSuchMethodException ignored) { + // ignore + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | RuntimeException e) { + log.warn("Failed to construct {}", providerName, e); + } + return null; } } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java index 2035695c..5baa47f4 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/BuilderDynaBean.java @@ -57,10 +57,10 @@ public class BuilderDynaBean implements DynaBean { } public BuilderDynaBean(Class destinedClass, ConvertUtilsBean convertUtilsBean, - Function emtpyPropertyHandler, List classPrefixSearchList) { + Function emptyPropertyHandler, List classPrefixSearchList) { this.convertUtilsBean = convertUtilsBean; this.classPrefixSearchList = classPrefixSearchList; - this.emptyPropertyHandler = emtpyPropertyHandler; + this.emptyPropertyHandler = emptyPropertyHandler; initialize(destinedClass); } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index 5e2ddb1d..49856aa6 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -62,6 +62,7 @@ public class KinesisClientLibConfigurator { public MultiLangDaemonConfiguration getConfiguration(Properties properties) { properties.entrySet().forEach(e -> { try { + log.info("Processing (key={}, value={})", e.getKey(), e.getValue()); utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue()); } catch (IllegalAccessException | InvocationTargetException ex) { throw new RuntimeException(ex); diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index c5740a2f..aa46f431 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -36,7 +36,7 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; @RunWith(MockitoJUnitRunner.class) public class MultiLangDaemonConfigTest { - private static final String FILENAME = "some.properties"; + private static final String FILENAME = "multilang.properties"; private static final String EXE = "TestExe.exe"; private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName(); private static final String STREAM_NAME = "fakeStream"; @@ -52,7 +52,7 @@ public class MultiLangDaemonConfigTest { @Mock private AwsCredentials creds; - private KinesisClientLibConfigurator configurator; + private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); private MultiLangDaemonConfig deamonConfig; /** @@ -62,7 +62,6 @@ public class MultiLangDaemonConfigTest { * @throws IOException */ public void setup(String streamName, String streamArn) throws IOException { - String properties = String.format("executableName = %s\n" + "applicationName = %s\n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" @@ -85,8 +84,6 @@ public class MultiLangDaemonConfigTest { when(credentialsProvider.resolveCredentials()).thenReturn(creds); when(creds.accessKeyId()).thenReturn("cool-user"); - configurator = new KinesisClientLibConfigurator(); - deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); } @@ -201,4 +198,14 @@ public class MultiLangDaemonConfigTest { } } + /** + * Test the loading of a "real" properties file. This test should catch + * any issues which might arise if there is a discrepancy between reality + * and mocking. + */ + @Test + public void testActualPropertiesFile() throws Exception { + new MultiLangDaemonConfig(FILENAME); + } + } \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java new file mode 100644 index 00000000..3f61db7a --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.multilang; + +import com.amazonaws.regions.Regions; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT; +import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT_REGION; +import static software.amazon.kinesis.multilang.NestedPropertyKey.EXTERNAL_ID; +import static software.amazon.kinesis.multilang.NestedPropertyKey.parse; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class NestedPropertyKeyTest { + + @Mock + private NestedPropertyProcessor mockProcessor; + + @Test + public void testExternalId() { + final String expectedId = "eid"; + + parse(mockProcessor, createKey(EXTERNAL_ID, expectedId)); + verify(mockProcessor).acceptExternalId(expectedId); + } + + @Test + public void testEndpoint() { + final String expectedEndpoint = "https://sts.us-east-1.amazonaws.com"; + final String expectedRegion = "us-east-1"; + final String param = createKey(ENDPOINT, expectedEndpoint + "^" + expectedRegion); + + parse(mockProcessor, param); + verify(mockProcessor).acceptEndpoint(expectedEndpoint, expectedRegion); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidEndpoint() { + parse(mockProcessor, createKey(ENDPOINT, "value-sans-caret-delimiter")); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidEndpointDoubleCaret() { + parse(mockProcessor, createKey(ENDPOINT, "https://sts.us-east-1.amazonaws.com^us-east-1^borkbork")); + } + + @Test + public void testEndpointRegion() { + final Regions expectedRegion = Regions.GovCloud; + + parse(mockProcessor, createKey(ENDPOINT_REGION, expectedRegion.getName())); + verify(mockProcessor).acceptEndpointRegion(expectedRegion); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidEndpointRegion() { + parse(mockProcessor, createKey(ENDPOINT_REGION, "snuffleupagus")); + } + + /** + * Test that the literal nested key (i.e., {@code key=} in {@code some_val|key=nested_val}) + * does not change. Any change to an existing literal key is not backwards-compatible. + */ + @Test + public void testKeysExplicitly() { + // Adding a new enum will deliberately cause this assert to fail, and + // therefore raise awareness for this explicit test. Add-and-remove may + // keep the number unchanged yet will also break (by removing an enum). + assertEquals(3, NestedPropertyKey.values().length); + + assertEquals("endpoint", ENDPOINT.getNestedKey()); + assertEquals("endpointRegion", ENDPOINT_REGION.getNestedKey()); + assertEquals("externalId", EXTERNAL_ID.getNestedKey()); + } + + @Test + public void testNonmatchingParameters() { + final String[] params = new String[] { + null, + "", + "hello world", // no nested key + "foo=bar", // nested key, but is not a recognized key + createKey(EXTERNAL_ID, "eid") + "=extra", // valid key made invalid by second '=' + }; + parse(mockProcessor, params); + verifyZeroInteractions(mockProcessor); + } + + private static String createKey(final NestedPropertyKey key, final String value) { + return key.getNestedKey() + "=" + value; + } + +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java new file mode 100644 index 00000000..1c9e6bca --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.multilang.auth; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class KclSTSAssumeRoleSessionCredentialsProviderTest { + + private static final String ARN = "arn"; + private static final String SESSION_NAME = "sessionName"; + + /** + * Test that the constructor doesn't throw an out-of-bounds exception if + * there are no parameters beyond the required ARN and session name. + */ + @Test + public void testConstructorWithoutOptionalParams() { + new KclSTSAssumeRoleSessionCredentialsProvider(new String[] { ARN, SESSION_NAME }); + } + + @Test + public void testAcceptEndpoint() { + // discovered exception during e2e testing; therefore, this test is + // to simply verify the constructed STS client doesn't go *boom* + final KclSTSAssumeRoleSessionCredentialsProvider provider = + new KclSTSAssumeRoleSessionCredentialsProvider(ARN, SESSION_NAME); + provider.acceptEndpoint("endpoint", "us-east-1"); + } + + @Test + public void testVarArgs() { + for (final String[] varargs : Arrays.asList( + new String[] { ARN, SESSION_NAME, "externalId=eid", "foo"}, + new String[] { ARN, SESSION_NAME, "foo", "externalId=eid"} + )) { + final VarArgsSpy provider = new VarArgsSpy(varargs); + assertEquals("eid", provider.externalId); + } + } + + private static class VarArgsSpy extends KclSTSAssumeRoleSessionCredentialsProvider { + + private String externalId; + + public VarArgsSpy(String[] args) { + super(args); + } + + @Override + public void acceptExternalId(final String externalId) { + this.externalId = externalId; + super.acceptExternalId(externalId); + } + } +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java index ced63f24..80e67d26 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -16,6 +16,8 @@ package software.amazon.kinesis.multilang.config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import java.util.Arrays; @@ -26,6 +28,7 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Test; +import software.amazon.kinesis.multilang.auth.KclSTSAssumeRoleSessionCredentialsProvider; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; @@ -38,7 +41,7 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { private final String credentialName1 = AlwaysSucceedCredentialsProvider.class.getName(); private final String credentialName2 = ConstructorCredentialsProvider.class.getName(); - private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); + private final AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); @ToString private static class AWSCredentialsMatcher extends TypeSafeDiagnosingMatcher { @@ -53,10 +56,6 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { this.classMatcher = instanceOf(AWSCredentialsProviderChain.class); } - private AWSCredentialsMatcher(AWSCredentials expected) { - this(expected.getAWSAccessKeyId(), expected.getAWSSecretKey()); - } - @Override protected boolean matchesSafely(AWSCredentialsProvider item, Description mismatchDescription) { AWSCredentials actual = item.getCredentials(); @@ -114,6 +113,33 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { assertThat(provider, hasCredentials("arg1", "arg2")); } + /** + * Test that providers in the multi-lang auth package can be resolved and instantiated. + */ + @Test + public void testKclAuthProvider() { + for (final String className : Arrays.asList( + KclSTSAssumeRoleSessionCredentialsProvider.class.getName(), // fully-qualified name + KclSTSAssumeRoleSessionCredentialsProvider.class.getSimpleName() // name-only; needs prefix + )) { + final AWSCredentialsProvider provider = decoder.decodeValue(className + "|arn|sessionName"); + assertNotNull(className, provider); + } + } + + /** + * Test that a provider can be instantiated by its varargs constructor. + */ + @Test + public void testVarArgAuthProvider() { + final String[] args = new String[] { "arg1", "arg2", "arg3" }; + final String className = VarArgCredentialsProvider.class.getName(); + final String encodedValue = className + "|" + String.join("|", args); + + final AWSCredentialsProvider provider = decoder.decodeValue(encodedValue); + assertEquals(Arrays.toString(args), provider.getCredentials().getAWSAccessKeyId()); + } + /** * This credentials provider will always succeed */ @@ -138,9 +164,9 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { private String arg1; private String arg2; + @SuppressWarnings("unused") public ConstructorCredentialsProvider(String arg1) { - this.arg1 = arg1; - this.arg2 = "blank"; + this(arg1, "blank"); } public ConstructorCredentialsProvider(String arg1, String arg2) { @@ -158,4 +184,25 @@ public class AWSCredentialsProviderPropertyValueDecoderTest { } } + + private static class VarArgCredentialsProvider implements AWSCredentialsProvider { + + private final String[] args; + + public VarArgCredentialsProvider(final String[] args) { + this.args = args; + } + + @Override + public AWSCredentials getCredentials() { + // KISS solution to surface the constructor args + final String flattenedArgs = Arrays.toString(args); + return new BasicAWSCredentials(flattenedArgs, flattenedArgs); + } + + @Override + public void refresh() { + + } + } } diff --git a/amazon-kinesis-client-multilang/src/test/resources/multilang.properties b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties new file mode 100644 index 00000000..34cb0c1a --- /dev/null +++ b/amazon-kinesis-client-multilang/src/test/resources/multilang.properties @@ -0,0 +1,93 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = sample_kclpy_app.py + +# The Stream arn: arn:aws:kinesis:::stream/ +# Important: streamArn takes precedence over streamName if both are set +streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample + +# The name of an Amazon Kinesis stream to process. +# Important: streamArn takes precedence over streamName if both are set +streamName = kclpysample + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = MultiLangTest + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = python/3.8 + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended', +# and uncomment below line with right timestamp value. +# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +#initialPositionInStreamExtended = 1636609142 + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-east-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +workerId = "workerId" + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +maxRecords = 10000 + +# Idle time between record reads in milliseconds. +idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +maxActiveThreads = -1