diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index d4194a10..4615cb3e 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -26,6 +26,7 @@ import org.apache.commons.beanutils.ConvertUtilsBean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.regions.Region; /** * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following @@ -76,13 +77,20 @@ public class KinesisClientLibConfigurator { Validate.notBlank(configuration.getApplicationName(), "Application name is required"); try { - Validate.notBlank(configuration.getStreamName(), ""); - }catch (Exception e) { - Validate.notBlank(configuration.getStreamArn(), "Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)"); + Validate.notBlank(configuration.getStreamArn(), ""); Arn streamArnObj = Arn.fromString(configuration.getStreamArn()); + + //Parse out the stream Name from the Arn (and/or override existing value for Stream Name) String streamNameFromArn = streamArnObj.getResource().getResource(); configuration.setStreamName(streamNameFromArn); + //Parse out the region from the Arn and set (and/or override existing value for region) + String regionName = streamArnObj.getRegion(); + Region regionObj = Region.of(regionName); + configuration.setRegionName(regionObj); + }catch (Exception e) { + Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); + } Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); return configuration; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index c4662fa8..9b911d98 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -40,7 +40,10 @@ public class MultiLangDaemonConfigTest { private static String TestStreamName = "fakeStream"; private static String TestStreamNameInArn = "FAKE_STREAM_NAME"; - private static String TestStreamArn = "arn:aws:kinesis:us-east-1:ACCOUNT_ID:stream/FAKE_STREAM_NAME"; + private static String TestRegion = "us-east-1"; + + private static String TestRegionInArn = "us-east-2"; + private static String TestStreamArn = "arn:aws:kinesis:us-east-2:ACCOUNT_ID:stream/FAKE_STREAM_NAME"; @Mock ClassLoader classLoader; @@ -57,9 +60,11 @@ public class MultiLangDaemonConfigTest { String PROPERTIES = String.format("executableName = %s \n" + "applicationName = %s \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" - + "processingLanguage = malbolge\n", + + "processingLanguage = malbolge\n" + + "regionName = %s \n", TestExe, - TestApplicationName); + TestApplicationName, + TestRegion); if(streamName != null){ PROPERTIES += String.format("streamName = %s \n", streamName); @@ -112,7 +117,7 @@ public class MultiLangDaemonConfigTest { MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); - AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion); } @Test @@ -121,7 +126,7 @@ public class MultiLangDaemonConfigTest { MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); - AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion); } @Test @@ -130,7 +135,7 @@ public class MultiLangDaemonConfigTest { MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); - AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn); } @Test @@ -139,16 +144,16 @@ public class MultiLangDaemonConfigTest { MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); - AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn); } @Test - public void testConstructorUsingStreamNameOverStreamArn() throws IOException { + public void testConstructorUsingStreamArnOverStreamName() throws IOException { setup(TestStreamName, TestStreamArn); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); - AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn); } /** @@ -159,7 +164,8 @@ public class MultiLangDaemonConfigTest { private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig, String expectedExe, String expectedApplicationName, - String expectedStreamName){ + String expectedStreamName, + String expectedRegionName){ assertNotNull(deamonConfig.getExecutorService()); assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); assertNotNull(deamonConfig.getRecordProcessorFactory()); @@ -167,6 +173,9 @@ public class MultiLangDaemonConfigTest { assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]); assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName()); assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString()); } @Test diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 59e9ee64..5a484bfc 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -308,7 +308,7 @@ public class KinesisClientLibConfiguratorTest { @Test public void testWithMissingStreamNameAndMissingStreamArn() { thrown.expect(NullPointerException.class); - thrown.expectMessage("Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)"); + thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); String test = StringUtils.join(new String[] { "applicationName = b", @@ -323,7 +323,7 @@ public class KinesisClientLibConfiguratorTest { @Test public void testWithEmptyStreamNameAndMissingStreamArn() { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)"); + thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); String test = StringUtils.join(new String[] { "applicationName = b",