diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java
index cca83062..ea3db8c3 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyKey.java
@@ -17,6 +17,7 @@ package software.amazon.kinesis.multilang;
import java.util.HashMap;
import java.util.Map;
+import com.amazonaws.regions.Regions;
import com.google.common.base.CaseFormat;
import lombok.AccessLevel;
@@ -47,6 +48,9 @@ public enum NestedPropertyKey {
* SIGNING_REGION ::= AWS_REGION
*
*
+ * It would be redundant to provide both this and {@link #ENDPOINT_REGION}.
+ *
+ * @see #ENDPOINT_REGION
* @see AWS Service endpoints
* @see Available Regions
*/
@@ -60,6 +64,21 @@ public enum NestedPropertyKey {
}
},
+ /**
+ * Specify the region where service requests will be submitted. This
+ * region will determine both the service endpoint and signing region.
+ *
+ * It would be redundant to provide both this and {@link #ENDPOINT}.
+ *
+ * @see #ENDPOINT
+ * @see Available Regions
+ */
+ ENDPOINT_REGION {
+ void visit(final NestedPropertyProcessor processor, final String region) {
+ processor.acceptEndpointRegion(Regions.fromName(region));
+ }
+ },
+
/**
* External ids may be used when delegating access in a multi-tenant
* environment, or to third parties.
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java
index ee623174..d3dd7a6f 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/NestedPropertyProcessor.java
@@ -14,11 +14,36 @@
*/
package software.amazon.kinesis.multilang;
+import com.amazonaws.regions.Regions;
+
/**
* Defines methods to process {@link NestedPropertyKey}s.
*/
public interface NestedPropertyProcessor {
+ /**
+ * Set the service endpoint where requests are sent.
+ *
+ * @param serviceEndpoint the service endpoint either with or without the protocol
+ * (e.g., https://sns.us-west-1.amazonaws.com, sns.us-west-1.amazonaws.com)
+ * @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1)
+ *
+ * @see #acceptEndpointRegion(Regions)
+ * @see
+ * AwsClientBuilder.EndpointConfiguration
+ */
+ void acceptEndpoint(String serviceEndpoint, String signingRegion);
+
+ /**
+ * Set the service endpoint where requests are sent.
+ *
+ * @param region Region to be used by the client. This will be used to determine both the service endpoint
+ * (e.g., https://sns.us-west-1.amazonaws.com) and signing region (e.g., us-west-1) for requests.
+ *
+ * @see #acceptEndpoint(String, String)
+ */
+ void acceptEndpointRegion(Regions region);
+
/**
* Set the external id, an optional field to designate who can assume an IAM role.
*
@@ -26,16 +51,4 @@ public interface NestedPropertyProcessor {
*/
void acceptExternalId(String externalId);
- /**
- * Set the service endpoint where requests are sent.
- *
- * @param serviceEndpoint the service endpoint either with or without the protocol
- * (e.g. https://sns.us-west-1.amazonaws.com or sns.us-west-1.amazonaws.com)
- * @param signingRegion the region to use for SigV4 signing of requests (e.g. us-west-1)
- *
- * @see
- * AwsClientBuilder.EndpointConfiguration
- */
- void acceptEndpoint(String serviceEndpoint, String signingRegion);
-
}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java
index c95855c0..3b196b94 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProvider.java
@@ -64,20 +64,26 @@ public class KclSTSAssumeRoleSessionCredentialsProvider
// do nothing
}
+ @Override
+ public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
+ final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion);
+ final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
+ .withEndpointConfiguration(endpoint)
+ .build();
+ builder.withStsClient(stsClient);
+ }
+
+ @Override
+ public void acceptEndpointRegion(final Regions region) {
+ final AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClient.builder()
+ .withRegion(region)
+ .build();
+ builder.withStsClient(stsClient);
+ }
+
@Override
public void acceptExternalId(final String externalId) {
builder.withExternalId(externalId);
}
- @Override
- public void acceptEndpoint(final String serviceEndpoint, final String signingRegion) {
- final EndpointConfiguration endpoint = new EndpointConfiguration(serviceEndpoint, signingRegion);
- final AWSSecurityTokenService stsClient =
- AWSSecurityTokenServiceClient.builder()
- .withEndpointConfiguration(endpoint)
- .withRegion(Regions.fromName(signingRegion))
- .build();
- builder.withStsClient(stsClient);
- }
-
}
\ No newline at end of file
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
index 5f124bfc..f11ac0ec 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoder.java
@@ -14,7 +14,7 @@
*/
package software.amazon.kinesis.multilang.config;
-import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -84,41 +84,35 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
}
clazz = (Class extends AWSCredentialsProvider>) c;
} catch (ClassNotFoundException cnfe) {
+ // Providers are a product of prefixed Strings to cover multiple
+ // namespaces (e.g., "Foo" -> { "some.auth.Foo", "kcl.auth.Foo" }).
+ // It's expected that many class names will not resolve.
continue;
}
+ log.info("Attempting to construct {}", clazz);
AWSCredentialsProvider provider = null;
if (nameAndArgs.length > 1) {
final String[] varargs = Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length);
// attempt to invoke an explicit N-arg constructor of FooClass(String, String, ...)
- try {
+ provider = constructProvider(providerName, () -> {
Class>[] argTypes = new Class>[nameAndArgs.length - 1];
Arrays.fill(argTypes, String.class);
- Constructor extends AWSCredentialsProvider> c = clazz.getConstructor(argTypes);
- provider = c.newInstance(varargs);
- } catch (Exception e) {
- log.debug("Can't find any credentials provider matching {}.", providerName);
- }
+ return clazz.getConstructor(argTypes).newInstance(varargs);
+ });
if (provider == null) {
// attempt to invoke a public varargs/array constructor of FooClass(String[])
- try {
- Constructor extends AWSCredentialsProvider> c = clazz.getConstructor(String[].class);
- provider = c.newInstance((Object) varargs);
- } catch (Exception e) {
- log.debug("Can't find any credentials provider matching {}.", providerName);
- }
+ provider = constructProvider(providerName, () ->
+ clazz.getConstructor(String[].class).newInstance((Object) varargs)
+ );
}
}
if (provider == null) {
// regardless of parameters, fallback to invoke a public no-arg constructor
- try {
- provider = clazz.newInstance();
- } catch (Exception e) {
- log.debug("Can't find any credentials provider matching {}.", providerName);
- }
+ provider = constructProvider(providerName, clazz::newInstance);
}
if (provider != null) {
@@ -158,4 +152,32 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
).map(prefix -> prefix + provider).collect(Collectors.toList());
}
+ @FunctionalInterface
+ private interface CredentialsProviderConstructor {
+ T construct() throws IllegalAccessException, InstantiationException,
+ InvocationTargetException, NoSuchMethodException;
+ }
+
+ /**
+ * Attempts to construct an {@link AWSCredentialsProvider}.
+ *
+ * @param providerName Raw, unmodified provider name. Should there be an
+ * Exeception during construction, this parameter will be logged.
+ * @param constructor supplier-like function that will perform the construction
+ * @return the constructed provider, if successful; otherwise, null
+ *
+ * @param type of the CredentialsProvider to construct
+ */
+ private static T constructProvider(
+ final String providerName, final CredentialsProviderConstructor constructor) {
+ try {
+ return constructor.construct();
+ } catch (NoSuchMethodException ignored) {
+ // ignore
+ } catch (IllegalAccessException | InstantiationException | InvocationTargetException | RuntimeException e) {
+ log.warn("Failed to construct {}", providerName, e);
+ }
+ return null;
+ }
+
}
diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java
index 5e2ddb1d..49856aa6 100644
--- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java
+++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java
@@ -62,6 +62,7 @@ public class KinesisClientLibConfigurator {
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
properties.entrySet().forEach(e -> {
try {
+ log.info("Processing (key={}, value={})", e.getKey(), e.getValue());
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
} catch (IllegalAccessException | InvocationTargetException ex) {
throw new RuntimeException(ex);
diff --git a/amazon-kinesis-client-multilang/src/main/resources/multilang.properties b/amazon-kinesis-client-multilang/src/main/resources/multilang.properties
new file mode 100644
index 00000000..46c9e2da
--- /dev/null
+++ b/amazon-kinesis-client-multilang/src/main/resources/multilang.properties
@@ -0,0 +1,93 @@
+# The script that abides by the multi-language protocol. This script will
+# be executed by the MultiLangDaemon, which will communicate with this script
+# over STDIN and STDOUT according to the multi-language protocol.
+executableName = sample_kclpy_app.py
+
+# The Stream arn: arn:aws:kinesis:::stream/
+# Important: streamArn takes precedence over streamName if both are set
+streamArn = arn:aws:kinesis:us-east-5:000000000000:stream/kclpysample
+
+# The name of an Amazon Kinesis stream to process.
+# Important: streamArn takes precedence over streamName if both are set
+streamName = kclpysample
+
+# Used by the KCL as the name of this application. Will be used as the name
+# of an Amazon DynamoDB table which will store the lease and checkpoint
+# information for workers with this application name
+applicationName = MultiLangTest
+
+# Users can change the credentials provider the KCL will use to retrieve credentials.
+# The DefaultAWSCredentialsProviderChain checks several other providers, which is
+# described here:
+# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
+AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
+
+# Appended to the user agent of the KCL. Does not impact the functionality of the
+# KCL in any other way.
+processingLanguage = python/3.8
+
+# Valid options at TRIM_HORIZON or LATEST.
+# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
+initialPositionInStream = TRIM_HORIZON
+
+# To specify an initial timestamp from which to start processing records, please specify timestamp value for 'initiatPositionInStreamExtended',
+# and uncomment below line with right timestamp value.
+# See more from 'Timestamp' under http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
+#initialPositionInStreamExtended = 1636609142
+
+# The following properties are also available for configuring the KCL Worker that is created
+# by the MultiLangDaemon.
+
+# The KCL defaults to us-east-1
+regionName = us-east-1
+
+# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
+# will be regarded as having problems and it's shards will be assigned to other workers.
+# For applications that have a large number of shards, this msy be set to a higher number to reduce
+# the number of DynamoDB IOPS required for tracking leases
+#failoverTimeMillis = 10000
+
+# A worker id that uniquely identifies this worker among all workers using the same applicationName
+# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
+#workerId =
+
+# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
+#shardSyncIntervalMillis = 60000
+
+# Max records to fetch from Kinesis in a single GetRecords call.
+#maxRecords = 10000
+
+# Idle time between record reads in milliseconds.
+#idleTimeBetweenReadsInMillis = 1000
+
+# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
+#callProcessRecordsEvenForEmptyRecordList = false
+
+# Interval in milliseconds between polling to check for parent shard completion.
+# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
+# completion of parent shards).
+#parentShardPollIntervalMillis = 10000
+
+# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
+# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
+# to delete the ones we don't need any longer.
+#cleanupLeasesUponShardCompletion = true
+
+# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
+#taskBackoffTimeMillis = 500
+
+# Buffer metrics for at most this long before publishing to CloudWatch.
+#metricsBufferTimeMillis = 10000
+
+# Buffer at most this many metrics before publishing to CloudWatch.
+#metricsMaxQueueSize = 10000
+
+# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
+# to RecordProcessorCheckpointer#checkpoint(String) by default.
+#validateSequenceNumberBeforeCheckpointing = true
+
+# The maximum number of active threads for the MultiLangDaemon to permit.
+# If a value is provided then a FixedThreadPool is used with the maximum
+# active threads set to the provided value. If a non-positive integer or no
+# value is provided a CachedThreadPool is used.
+#maxActiveThreads = 0
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java
index ae7c9939..93eb2743 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java
@@ -36,7 +36,7 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
@RunWith(MockitoJUnitRunner.class)
public class MultiLangDaemonConfigTest {
- private static final String FILENAME = "some.properties";
+ private static final String FILENAME = "multilang.properties";
private static final String EXE = "TestExe.exe";
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
private static final String STREAM_NAME = "fakeStream";
@@ -52,7 +52,7 @@ public class MultiLangDaemonConfigTest {
@Mock
private AwsCredentials creds;
- private KinesisClientLibConfigurator configurator;
+ private final KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
private MultiLangDaemonConfig deamonConfig;
/**
@@ -84,8 +84,6 @@ public class MultiLangDaemonConfigTest {
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
- configurator = new KinesisClientLibConfigurator();
-
deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
}
@@ -200,4 +198,9 @@ public class MultiLangDaemonConfigTest {
}
}
+ @Test
+ public void testActualPropertiesFile() throws Exception {
+ new MultiLangDaemonConfig(FILENAME);
+ }
+
}
\ No newline at end of file
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java
index 43bdb53e..3f61db7a 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/NestedPropertyKeyTest.java
@@ -14,10 +14,13 @@
*/
package software.amazon.kinesis.multilang;
+import com.amazonaws.regions.Regions;
+
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT;
+import static software.amazon.kinesis.multilang.NestedPropertyKey.ENDPOINT_REGION;
import static software.amazon.kinesis.multilang.NestedPropertyKey.EXTERNAL_ID;
import static software.amazon.kinesis.multilang.NestedPropertyKey.parse;
@@ -42,7 +45,7 @@ public class NestedPropertyKeyTest {
@Test
public void testEndpoint() {
- final String expectedEndpoint = "https://sts.us-east-1.amazon.com";
+ final String expectedEndpoint = "https://sts.us-east-1.amazonaws.com";
final String expectedRegion = "us-east-1";
final String param = createKey(ENDPOINT, expectedEndpoint + "^" + expectedRegion);
@@ -55,6 +58,24 @@ public class NestedPropertyKeyTest {
parse(mockProcessor, createKey(ENDPOINT, "value-sans-caret-delimiter"));
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidEndpointDoubleCaret() {
+ parse(mockProcessor, createKey(ENDPOINT, "https://sts.us-east-1.amazonaws.com^us-east-1^borkbork"));
+ }
+
+ @Test
+ public void testEndpointRegion() {
+ final Regions expectedRegion = Regions.GovCloud;
+
+ parse(mockProcessor, createKey(ENDPOINT_REGION, expectedRegion.getName()));
+ verify(mockProcessor).acceptEndpointRegion(expectedRegion);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidEndpointRegion() {
+ parse(mockProcessor, createKey(ENDPOINT_REGION, "snuffleupagus"));
+ }
+
/**
* Test that the literal nested key (i.e., {@code key=} in {@code some_val|key=nested_val})
* does not change. Any change to an existing literal key is not backwards-compatible.
@@ -64,9 +85,10 @@ public class NestedPropertyKeyTest {
// Adding a new enum will deliberately cause this assert to fail, and
// therefore raise awareness for this explicit test. Add-and-remove may
// keep the number unchanged yet will also break (by removing an enum).
- assertEquals(2, NestedPropertyKey.values().length);
+ assertEquals(3, NestedPropertyKey.values().length);
assertEquals("endpoint", ENDPOINT.getNestedKey());
+ assertEquals("endpointRegion", ENDPOINT_REGION.getNestedKey());
assertEquals("externalId", EXTERNAL_ID.getNestedKey());
}
diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java
index 20e4bc8a..1c9e6bca 100644
--- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java
+++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/auth/KclSTSAssumeRoleSessionCredentialsProviderTest.java
@@ -22,11 +22,32 @@ import org.junit.Test;
public class KclSTSAssumeRoleSessionCredentialsProviderTest {
+ private static final String ARN = "arn";
+ private static final String SESSION_NAME = "sessionName";
+
+ /**
+ * Test that the constructor doesn't throw an out-of-bounds exception if
+ * there are no parameters beyond the required ARN and session name.
+ */
+ @Test
+ public void testConstructorWithoutOptionalParams() {
+ new KclSTSAssumeRoleSessionCredentialsProvider(new String[] { ARN, SESSION_NAME });
+ }
+
+ @Test
+ public void testAcceptEndpoint() {
+ // discovered exception during e2e testing; therefore, this test is
+ // to simply verify the constructed STS client doesn't go *boom*
+ final KclSTSAssumeRoleSessionCredentialsProvider provider =
+ new KclSTSAssumeRoleSessionCredentialsProvider(ARN, SESSION_NAME);
+ provider.acceptEndpoint("endpoint", "us-east-1");
+ }
+
@Test
public void testVarArgs() {
for (final String[] varargs : Arrays.asList(
- new String[] { "arn", "session", "externalId=eid", "foo"},
- new String[] { "arn", "session", "foo", "externalId=eid"}
+ new String[] { ARN, SESSION_NAME, "externalId=eid", "foo"},
+ new String[] { ARN, SESSION_NAME, "foo", "externalId=eid"}
)) {
final VarArgsSpy provider = new VarArgsSpy(varargs);
assertEquals("eid", provider.externalId);