From d2fe79112e50d0d03047aba1c47bffb33074a39d Mon Sep 17 00:00:00 2001 From: Ryan Pelaez Date: Wed, 14 Jun 2023 16:46:50 -0700 Subject: [PATCH] Updated multilang to support streamArn --- .../config/KinesisClientLibConfigurator.java | 31 ++- .../config/MultiLangDaemonConfiguration.java | 7 +- .../multilang/MultiLangDaemonConfigTest.java | 203 +++++++++++++++--- .../KinesisClientLibConfiguratorTest.java | 43 +++- 4 files changed, 236 insertions(+), 48 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index f3facdc0..48ec7adf 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -19,12 +19,14 @@ import java.io.InputStream; import java.lang.reflect.InvocationTargetException; import java.util.Properties; +import com.amazonaws.arn.Arn; import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.regions.Region; /** * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following @@ -55,7 +57,7 @@ public class KinesisClientLibConfigurator { * Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and * continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not * match any of the variables in KinesisClientLibConfigration. - * + * * @param properties a Properties object containing the configuration information * @return KinesisClientLibConfiguration */ @@ -69,7 +71,30 @@ public class KinesisClientLibConfigurator { }); Validate.notBlank(configuration.getApplicationName(), "Application name is required"); - Validate.notBlank(configuration.getStreamName(), "Stream name is required"); + + if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) { + Arn streamArnObj = Arn.fromString(configuration.getStreamArn()); + if(!streamArnObj.getResource().getResourceType().equalsIgnoreCase("stream")){ + throw new IllegalArgumentException(String.format("StreamArn has unsupported resource type of '%s'. Expected: stream", + streamArnObj.getResource().getResourceType())); + } + if(!streamArnObj.getService().equalsIgnoreCase("kinesis")){ + throw new IllegalArgumentException(String.format("StreamArn has unsupported service type of '%s'. Expected: kinesis", + streamArnObj.getResource().getResourceType())); + } + //Parse out the stream Name from the Arn (and/or override existing value for Stream Name) + String streamNameFromArn = streamArnObj.getResource().getResource(); + configuration.setStreamName(streamNameFromArn); + + //Parse out the region from the Arn and set (and/or override existing value for region) + Region regionObj = Region.of(streamArnObj.getRegion()); + if(Region.regions().stream().filter(x -> x.id().equalsIgnoreCase(regionObj.id())).count() == 0){ + throw new IllegalArgumentException(String.format("%s is not a valid region", regionObj.id())); + } + configuration.setRegionName(regionObj); + } else { + Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in."); + } Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); return configuration; } @@ -97,4 +122,4 @@ public class KinesisClientLibConfigurator { } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index da280ddf..d5f10477 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -28,7 +28,6 @@ import java.util.UUID; import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -73,6 +72,8 @@ public class MultiLangDaemonConfiguration { private String applicationName; private String streamName; + private String streamArn; + @ConfigurationSettable(configurationClass = ConfigsBuilder.class) private String tableName; @@ -300,7 +301,7 @@ public class MultiLangDaemonConfiguration { } private void updateCredentials(BuilderDynaBean toUpdate, AwsCredentialsProvider primary, - AwsCredentialsProvider secondary) { + AwsCredentialsProvider secondary) { if (toUpdate.hasValue("credentialsProvider")) { return; @@ -403,4 +404,4 @@ public class MultiLangDaemonConfiguration { return resolvedConfiguration(shardRecordProcessorFactory).build(); } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index b86a64ad..2e921580 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -14,17 +14,14 @@ */ package software.amazon.kinesis.multilang; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Properties; -import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtilsBean; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -34,12 +31,25 @@ import org.mockito.runners.MockitoJUnitRunner; import junit.framework.Assert; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator; -import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; @RunWith(MockitoJUnitRunner.class) public class MultiLangDaemonConfigTest { private static String FILENAME = "some.properties"; + private static String TestExe = "TestExe.exe"; + private static String TestApplicationName = "TestApp"; + private static String TestStreamName = "fakeStream"; + private static String TestStreamNameInArn = "FAKE_STREAM_NAME"; + private static String TestRegion = "us-east-1"; + private static String TestRegionInArn = "us-east-2"; + + private static String getTestStreamArn(){ + return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn); + } + + @Mock + ClassLoader classLoader; @Mock private AwsCredentialsProvider credentialsProvider; @@ -48,39 +58,170 @@ public class MultiLangDaemonConfigTest { @Mock private KinesisClientLibConfigurator configurator; - @Before - public void setup() { - ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); - BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean); - MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean, - convertUtilsBean); - multiLangDaemonConfiguration.setApplicationName("cool-app"); - multiLangDaemonConfiguration.setStreamName("cool-stream"); - multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker"); - when(credentialsProvider.resolveCredentials()).thenReturn(creds); - when(creds.accessKeyId()).thenReturn("cool-user"); - when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration); - } + public void setup(String streamName, String streamArn) { - @Test - public void constructorTest() throws IOException { - String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n" - + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" - + "processingLanguage = malbolge"; - ClassLoader classLoader = Mockito.mock(ClassLoader.class); + String PROPERTIES = String.format("executableName = %s\n" + + "applicationName = %s\n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge\n" + + "regionName = %s\n", + TestExe, + TestApplicationName, + TestRegion); + + if(streamName != null){ + PROPERTIES += String.format("streamName = %s\n", streamName); + } + if(streamArn != null){ + PROPERTIES += String.format("streamArn = %s\n", streamArn); + } + classLoader = Mockito.mock(ClassLoader.class); Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); - MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + when(credentialsProvider.resolveCredentials()).thenReturn(creds); + when(creds.accessKeyId()).thenReturn("cool-user"); + configurator = new KinesisClientLibConfigurator(); - assertNotNull(deamonConfig.getExecutorService()); - assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); - assertNotNull(deamonConfig.getRecordProcessorFactory()); } @Test - public void propertyValidation() { + public void testConstructorFailsBecauseStreamArnIsInvalid() throws IOException { + setup("", "this_is_not_a_valid_arn"); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamArnHasInvalidRegion() throws IOException { + setup("", "arn:aws:kinesis:us-east-1000:ACCOUNT_ID:stream/streamName"); + + assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamArnHasInvalidResourceType() throws IOException { + setup("", "arn:aws:kinesis:us-EAST-1:ACCOUNT_ID:dynamodb/streamName"); + + assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamArnHasInvalidService() throws IOException { + setup("", "arn:aws:kinesisFakeService:us-east-1:ACCOUNT_ID:stream/streamName"); + + assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException { + setup("", ""); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameAndArnAreNull() { + setup(null, null); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() { + setup(null, ""); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() { + setup("", null); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorUsingStreamName() throws IOException { + setup(TestStreamName, null); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, null); + } + + @Test + public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException { + setup(TestStreamName, ""); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, ""); + } + + @Test + public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException { + setup(TestStreamName, " "); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, ""); + } + + @Test + public void testConstructorUsingStreamArn() throws IOException { + setup(null, getTestStreamArn()); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn()); + } + + @Test + public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException { + setup("", getTestStreamArn()); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn()); + } + + @Test + public void testConstructorUsingStreamArnOverStreamName() throws IOException { + setup(TestStreamName, getTestStreamArn()); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn()); + } + + /** + * Verify the daemonConfig properties are what we expect them to be. + * @param deamonConfig + * @param expectedStreamName + */ + private void assertConfigurationsMatch(MultiLangDaemonConfig deamonConfig, + String expectedExe, + String expectedApplicationName, + String expectedStreamName, + String expectedRegionName, + String expectedStreamArn){ + assertNotNull(deamonConfig.getExecutorService()); + assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); + assertNotNull(deamonConfig.getRecordProcessorFactory()); + + assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]); + assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName()); + assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString()); + assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString()); + assertEquals(expectedStreamArn, deamonConfig.getMultiLangDaemonConfiguration().getStreamArn()); + } + + @Test + public void testPropertyValidation() { String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); @@ -99,4 +240,4 @@ public class MultiLangDaemonConfigTest { } } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 031fc427..a68dd8d8 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -107,8 +107,8 @@ public class KinesisClientLibConfiguratorTest { // they must specify the time with initialPositionInStreamExtended. try { getConfiguration(StringUtils.join(new String[] { "applicationName = app", - "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, - "initialPositionInStream = AT_TIMESTAMP"}, '\n')); + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStream = AT_TIMESTAMP"}, '\n')); fail("Should have thrown when initialPositionInStream is set to AT_TIMESTAMP"); } catch (Exception e) { Throwable rootCause = ExceptionUtils.getRootCause(e); @@ -122,8 +122,8 @@ public class KinesisClientLibConfiguratorTest { // value is provided, the constructor should throw an IllegalArgumentException exception. try { getConfiguration(StringUtils.join(new String[] { "applicationName = app", - "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, - "initialPositionInStreamExtended = null"}, '\n')); + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = null"}, '\n')); fail("Should have thrown when initialPositionInStreamExtended is set to null"); } catch (Exception e) { Throwable rootCause = ExceptionUtils.getRootCause(e); @@ -163,8 +163,8 @@ public class KinesisClientLibConfiguratorTest { @Test public void testWithBooleanVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a", - "applicationName = b", "AWSCredentialsProvider = ABCD, " + credentialName1, "workerId = 0", - "cleanupLeasesUponShardCompletion = false", "validateSequenceNumberBeforeCheckpointing = true" }, + "applicationName = b", "AWSCredentialsProvider = ABCD, " + credentialName1, "workerId = 0", + "cleanupLeasesUponShardCompletion = false", "validateSequenceNumberBeforeCheckpointing = true" }, '\n')); assertEquals(config.getApplicationName(), "b"); @@ -306,12 +306,33 @@ public class KinesisClientLibConfiguratorTest { } @Test - public void testWithMissingStreamName() { + public void testWithMissingStreamNameAndMissingStreamArn() { thrown.expect(NullPointerException.class); - thrown.expectMessage("Stream name is required"); + thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in."); - String test = StringUtils.join(new String[] { "applicationName = b", - "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100" }, + '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + configurator.getConfiguration(input); + } + @Test + public void testWithEmptyStreamNameAndMissingStreamArn() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in."); + + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100", + "streamName = ", + "streamArn = "}, + '\n'); InputStream input = new ByteArrayInputStream(test.getBytes()); configurator.getConfiguration(input); @@ -493,4 +514,4 @@ public class KinesisClientLibConfiguratorTest { MultiLangDaemonConfiguration config = configurator.getConfiguration(input); return config; } -} +} \ No newline at end of file