From 6809f29726825b9608158dc2bb88529b2feeb8f5 Mon Sep 17 00:00:00 2001 From: stair Date: Fri, 4 Aug 2023 14:58:03 -0400 Subject: [PATCH] Multiple multi-lang edits to introduce logging and additional tests. + added `ENDPOINT_REGION` nested key for a simpler Cx experience + deduplicated, and improved, logic w.r.t. CredentialsProvider construction to NOT swallow Exceptions --- .../kinesis/multilang/NestedPropertyKey.java | 19 ++++ .../multilang/NestedPropertyProcessor.java | 37 +++++--- ...SAssumeRoleSessionCredentialsProvider.java | 28 +++--- ...edentialsProviderPropertyValueDecoder.java | 58 ++++++++---- .../config/KinesisClientLibConfigurator.java | 1 + .../src/main/resources/multilang.properties | 93 +++++++++++++++++++ .../multilang/MultiLangDaemonConfigTest.java | 11 ++- .../multilang/NestedPropertyKeyTest.java | 26 +++++- ...umeRoleSessionCredentialsProviderTest.java | 25 ++++- 9 files changed, 249 insertions(+), 49 deletions(-) create mode 100644 amazon-kinesis-client-multilang/src/main/resources/multilang.properties diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java index cca83062..ea3db8c3 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.multilang; import java.util.HashMap; import java.util.Map; +import com.amazonaws.regions.Regions; import com.google.common.base.CaseFormat; import lombok.AccessLevel; @@ -47,6 +48,9 @@ public enum NestedPropertyKey { * SIGNING_REGION ::= AWS_REGION * * + * It would be redundant to provide both this and {@link #ENDPOINT_REGION}. + * + * @see #ENDPOINT_REGION * @see AWS Service endpoints * @see Available Regions */ @@ -60,6 +64,21 @@ public enum NestedPropertyKey { } }, + /** + * Specify the region where service requests will be submitted. This + * region will determine both the service endpoint and signing region. + *

+ * It would be redundant to provide both this and {@link #ENDPOINT}. + * + * @see #ENDPOINT + * @see Available Regions + */ + ENDPOINT_REGION { + void visit(final NestedPropertyProcessor processor, final String region) { + processor.acceptEndpointRegion(Regions.fromName(region)); + } + }, + /** * External ids may be used when delegating access in a multi-tenant * environment, or to third parties. diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java index ee623174..d3dd7a6f 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java @@ -14,11 +14,36 @@ */ package software.amazon.kinesis.multilang; +import com.amazonaws.regions.Regions; + /** * Defines methods to process {@link NestedPropertyKey}s. */ public interface NestedPropertyProcessor { + /** + * Set the service endpoint where requests are sent. + * + * @param serviceEndpoint the service endpoint either with or without the protocol + * (e.g., https://sns.us-west-1.amazonaws.com, sns.us-west-1.amazonaws.com) + * @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1) + * + * @see #acceptEndpointRegion(Regions) + * @see + * AwsClientBuilder.EndpointConfiguration + */ + void acceptEndpoint(String serviceEndpoint, String signingRegion); + + /** + * Set the service endpoint where requests are sent. + * + * @param region Region to be used by the client. This will be used to determine both the service endpoint + * (e.g., https://sns.us-west-1.amazonaws.com) and signing region (e.g., us-west-1) for requests. + * + * @see #acceptEndpoint(String, String) + */ + void acceptEndpointRegion(Regions region); + /** * Set the external id, an optional field to designate who can assume an IAM role. * @@ -26,16 +51,4 @@ public interface NestedPropertyProcessor { */ void acceptExternalId(String externalId); - /** - * Set the service endpoint where requests are sent. - * - * @param serviceEndpoint the service endpoint either with or without the protocol - * (e.g. https://sns.us-west-1.amazonaws.com or sns.us-west-1.amazonaws.com) - * @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1) - * - * @see - * AwsClientBuilder.EndpointConfiguration - */ - void acceptEndpoint(String serviceEndpoint, String signingRegion); - } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java index c95855c0..3b196b94 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java @@ -64,20 +64,26 @@ public class KclSTSAssumeRoleSessionCredentialsProvider // do nothing } + @Override + public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) { + final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion); + final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder() + .withEndpointConfiguration(endpoint) + .build(); + builder.withStsClient(stsClient); + } + + @Override + public void acceptEndpointRegion(final Regions region) { + final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder() + .withRegion(region) + .build(); + builder.withStsClient(stsClient); + } + @Override public void acceptExternalId(final String externalId) { builder.withExternalId(externalId); } - @Override - public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) { - final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion); - final AWSSecurityTokenService stsClient = - AWSSecurityTokenServiceClient.builder() - .withEndpointConfiguration(endpoint) - .withRegion(Regions.fromName(signingRegion)) - .build(); - builder.withStsClient(stsClient); - } - } \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java index 5f124bfc..f11ac0ec 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java @@ -14,7 +14,7 @@ */ package software.amazon.kinesis.multilang.config; -import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -84,41 +84,35 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode } clazz = (Class) c; } catch (ClassNotFoundException cnfe) { + // Providers are a product of prefixed Strings to cover multiple + // namespaces (e.g., "Foo" -> { "some.auth.Foo", "kcl.auth.Foo" }). + // It's expected that many class names will not resolve. continue; } + log.info("Attempting to construct {}", clazz); AWSCredentialsProvider provider = null; if (nameAndArgs.length > 1) { final String[] varargs = Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length); // attempt to invoke an explicit N-arg constructor of FooClass(String, String, ...) - try { + provider = constructProvider(providerName, () -> { Class[] argTypes = new Class[nameAndArgs.length - 1]; Arrays.fill(argTypes, String.class); - Constructor c = clazz.getConstructor(argTypes); - provider = c.newInstance(varargs); - } catch (Exception e) { - log.debug("Can't find any credentials provider matching {}.", providerName); - } + return clazz.getConstructor(argTypes).newInstance(varargs); + }); if (provider == null) { // attempt to invoke a public varargs/array constructor of FooClass(String[]) - try { - Constructor c = clazz.getConstructor(String[].class); - provider = c.newInstance((Object) varargs); - } catch (Exception e) { - log.debug("Can't find any credentials provider matching {}.", providerName); - } + provider = constructProvider(providerName, () -> + clazz.getConstructor(String[].class).newInstance((Object) varargs) + ); } } if (provider == null) { // regardless of parameters, fallback to invoke a public no-arg constructor - try { - provider = clazz.newInstance(); - } catch (Exception e) { - log.debug("Can't find any credentials provider matching {}.", providerName); - } + provider = constructProvider(providerName, clazz::newInstance); } if (provider != null) { @@ -158,4 +152,32 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode ).map(prefix -> prefix + provider).collect(Collectors.toList()); } + @FunctionalInterface + private interface CredentialsProviderConstructor { + T construct() throws IllegalAccessException, InstantiationException, + InvocationTargetException, NoSuchMethodException; + } + + /** + * Attempts to construct an {@link AWSCredentialsProvider}. + * + * @param providerName Raw, unmodified provider name. Should there be an + * Exeception during construction, this parameter will be logged. + * @param constructor supplier-like function that will perform the construction + * @return the constructed provider, if successful; otherwise, null + * + * @param type of the CredentialsProvider to construct + */ + private static T constructProvider( + final String providerName, final CredentialsProviderConstructor constructor) { + try { + return constructor.construct(); + } catch (NoSuchMethodException ignored) { + // ignore + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | RuntimeException e) { + log.warn("Failed to construct {}", providerName, e); + } + return null; + } + } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index 5e2ddb1d..49856aa6 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -62,6 +62,7 @@ public class KinesisClientLibConfigurator { public MultiLangDaemonConfiguration getConfiguration(Properties properties) { properties.entrySet().forEach(e -> { try { + log.info("Processing (key={}, value={})", e.getKey(), e.getValue()); utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue()); } catch (IllegalAccessException | InvocationTargetException ex) { throw new RuntimeException(ex); diff --git a/amazon-kinesis-client-multilang/src/main/resources/multilang.properties b/amazon-kinesis-client-multilang/src/main/resources/multilang.properties new file mode 100644 index 00000000..46c9e2da --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/resources/multilang.properties @@ -0,0 +1,93 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = sample_kclpy_app.py + +# The Stream arn: arn:aws:kinesis:::stream/ +# Important: streamArn takes precedence over streamName if both are set +streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample + +# The name of an Amazon Kinesis stream to process. +# Important: streamArn takes precedence over streamName if both are set +streamName = kclpysample + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = MultiLangTest + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = python/3.8 + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended', +# and uncomment below line with right timestamp value. +# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +#initialPositionInStreamExtended = 1636609142 + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-east-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +#failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +#workerId = + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +#shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +#maxRecords = 10000 + +# Idle time between record reads in milliseconds. +#idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +#callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +#parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +#cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +#taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +#metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +#metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +#validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +#maxActiveThreads = 0 diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index ae7c9939..93eb2743 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -36,7 +36,7 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; @RunWith(MockitoJUnitRunner.class) public class MultiLangDaemonConfigTest { - private static final String FILENAME = "some.properties"; + private static final String FILENAME = "multilang.properties"; private static final String EXE = "TestExe.exe"; private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName(); private static final String STREAM_NAME = "fakeStream"; @@ -52,7 +52,7 @@ public class MultiLangDaemonConfigTest { @Mock private AwsCredentials creds; - private KinesisClientLibConfigurator configurator; + private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); private MultiLangDaemonConfig deamonConfig; /** @@ -84,8 +84,6 @@ public class MultiLangDaemonConfigTest { when(credentialsProvider.resolveCredentials()).thenReturn(creds); when(creds.accessKeyId()).thenReturn("cool-user"); - configurator = new KinesisClientLibConfigurator(); - deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); } @@ -200,4 +198,9 @@ public class MultiLangDaemonConfigTest { } } + @Test + public void testActualPropertiesFile() throws Exception { + new MultiLangDaemonConfig(FILENAME); + } + } \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java index 43bdb53e..3f61db7a 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java @@ -14,10 +14,13 @@ */ package software.amazon.kinesis.multilang; +import com.amazonaws.regions.Regions; + import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT; +import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT_REGION; import static software.amazon.kinesis.multilang.NestedPropertyKey.EXTERNAL_ID; import static software.amazon.kinesis.multilang.NestedPropertyKey.parse; @@ -42,7 +45,7 @@ public class NestedPropertyKeyTest { @Test public void testEndpoint() { - final String expectedEndpoint = "https://sts.us-east-1.amazon.com"; + final String expectedEndpoint = "https://sts.us-east-1.amazonaws.com"; final String expectedRegion = "us-east-1"; final String param = createKey(ENDPOINT, expectedEndpoint + "^" + expectedRegion); @@ -55,6 +58,24 @@ public class NestedPropertyKeyTest { parse(mockProcessor, createKey(ENDPOINT, "value-sans-caret-delimiter")); } + @Test(expected = IllegalArgumentException.class) + public void testInvalidEndpointDoubleCaret() { + parse(mockProcessor, createKey(ENDPOINT, "https://sts.us-east-1.amazonaws.com^us-east-1^borkbork")); + } + + @Test + public void testEndpointRegion() { + final Regions expectedRegion = Regions.GovCloud; + + parse(mockProcessor, createKey(ENDPOINT_REGION, expectedRegion.getName())); + verify(mockProcessor).acceptEndpointRegion(expectedRegion); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidEndpointRegion() { + parse(mockProcessor, createKey(ENDPOINT_REGION, "snuffleupagus")); + } + /** * Test that the literal nested key (i.e., {@code key=} in {@code some_val|key=nested_val}) * does not change. Any change to an existing literal key is not backwards-compatible. @@ -64,9 +85,10 @@ public class NestedPropertyKeyTest { // Adding a new enum will deliberately cause this assert to fail, and // therefore raise awareness for this explicit test. Add-and-remove may // keep the number unchanged yet will also break (by removing an enum). - assertEquals(2, NestedPropertyKey.values().length); + assertEquals(3, NestedPropertyKey.values().length); assertEquals("endpoint", ENDPOINT.getNestedKey()); + assertEquals("endpointRegion", ENDPOINT_REGION.getNestedKey()); assertEquals("externalId", EXTERNAL_ID.getNestedKey()); } diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java index 20e4bc8a..1c9e6bca 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java @@ -22,11 +22,32 @@ import org.junit.Test; public class KclSTSAssumeRoleSessionCredentialsProviderTest { + private static final String ARN = "arn"; + private static final String SESSION_NAME = "sessionName"; + + /** + * Test that the constructor doesn't throw an out-of-bounds exception if + * there are no parameters beyond the required ARN and session name. + */ + @Test + public void testConstructorWithoutOptionalParams() { + new KclSTSAssumeRoleSessionCredentialsProvider(new String[] { ARN, SESSION_NAME }); + } + + @Test + public void testAcceptEndpoint() { + // discovered exception during e2e testing; therefore, this test is + // to simply verify the constructed STS client doesn't go *boom* + final KclSTSAssumeRoleSessionCredentialsProvider provider = + new KclSTSAssumeRoleSessionCredentialsProvider(ARN, SESSION_NAME); + provider.acceptEndpoint("endpoint", "us-east-1"); + } + @Test public void testVarArgs() { for (final String[] varargs : Arrays.asList( - new String[] { "arn", "session", "externalId=eid", "foo"}, - new String[] { "arn", "session", "foo", "externalId=eid"} + new String[] { ARN, SESSION_NAME, "externalId=eid", "foo"}, + new String[] { ARN, SESSION_NAME, "foo", "externalId=eid"} )) { final VarArgsSpy provider = new VarArgsSpy(varargs); assertEquals("eid", provider.externalId);