From 3d71e0868eb912bc788764028402d8492405ae59 Mon Sep 17 00:00:00 2001 From: Ryan Pelaez Date: Fri, 9 Jun 2023 16:28:38 -0700 Subject: [PATCH] Updated multilang daemon to support 'StreamArn' as a property to parse Stream name from' --- .../config/KinesisClientLibConfigurator.java | 18 ++- .../config/MultiLangDaemonConfiguration.java | 2 + .../multilang/MultiLangDaemonConfigTest.java | 150 ++++++++++++++---- .../KinesisClientLibConfiguratorTest.java | 29 +++- 4 files changed, 163 insertions(+), 36 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index f3facdc0..33c51a91 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -19,6 +19,7 @@ import java.io.InputStream; import java.lang.reflect.InvocationTargetException; import java.util.Properties; +import com.amazonaws.arn.Arn; import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean; @@ -62,14 +63,27 @@ public class KinesisClientLibConfigurator { public MultiLangDaemonConfiguration getConfiguration(Properties properties) { properties.entrySet().forEach(e -> { try { - utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue()); + Object value = e.getValue(); + if(value instanceof String){ + value = ((String) value).trim(); + } + utilsBean.setProperty(configuration, (String) e.getKey(), value); } catch (IllegalAccessException | InvocationTargetException ex) { throw new RuntimeException(ex); } }); Validate.notBlank(configuration.getApplicationName(), "Application name is required"); - Validate.notBlank(configuration.getStreamName(), "Stream name is required"); + try { + Arn streamArnObj = Arn.fromString(configuration.getStreamArn()); + String streamNameFromArn = streamArnObj.getResource().getResource(); + Validate.notBlank(streamNameFromArn, ""); + + //Stream Arn takes precedence. Override existing configuration with the stream name found in the Arn + configuration.setStreamName(streamNameFromArn); + }catch (Exception e) { + Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); + } Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); return configuration; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index da280ddf..9a96dbc9 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -73,6 +73,8 @@ public class MultiLangDaemonConfiguration { private String applicationName; private String streamName; + private String streamArn; + @ConfigurationSettable(configurationClass = ConfigsBuilder.class) private String tableName; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index b86a64ad..d73a808c 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -14,17 +14,13 @@ */ package software.amazon.kinesis.multilang; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Properties; -import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtilsBean; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -35,11 +31,19 @@ import junit.framework.Assert; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator; -import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; @RunWith(MockitoJUnitRunner.class) public class MultiLangDaemonConfigTest { private static String FILENAME = "some.properties"; + private static String TestExe = "TestExe.exe"; + private static String TestApplicationName = "TestApp"; + private static String TestStreamName = "fakeStream"; + + private static String TestStreamNameInArn = "FAKE_STREAM_NAME"; + private static String TestStreamArn = "arn:aws:kinesis:us-east-1:ACCOUNT_ID:stream/FAKE_STREAM_NAME"; + + @Mock + ClassLoader classLoader; @Mock private AwsCredentialsProvider credentialsProvider; @@ -48,39 +52,125 @@ public class MultiLangDaemonConfigTest { @Mock private KinesisClientLibConfigurator configurator; - @Before - public void setup() { - ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); - BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean); - MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean, - convertUtilsBean); - multiLangDaemonConfiguration.setApplicationName("cool-app"); - multiLangDaemonConfiguration.setStreamName("cool-stream"); - multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker"); - when(credentialsProvider.resolveCredentials()).thenReturn(creds); - when(creds.accessKeyId()).thenReturn("cool-user"); - when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration); - } + public void setup(String streamName, String streamArn) { - @Test - public void constructorTest() throws IOException { - String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n" - + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" - + "processingLanguage = malbolge"; - ClassLoader classLoader = Mockito.mock(ClassLoader.class); + String PROPERTIES = String.format("executableName = %s \n" + + "applicationName = %s \n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge\n", + TestExe, + TestApplicationName); + + if(streamName != null){ + PROPERTIES += String.format("streamName = %s \n", streamName); + } + if(streamArn != null){ + PROPERTIES += String.format("streamArn = %s \n", streamArn); + } + classLoader = Mockito.mock(ClassLoader.class); Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); - MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + when(credentialsProvider.resolveCredentials()).thenReturn(creds); + when(creds.accessKeyId()).thenReturn("cool-user"); + configurator = new KinesisClientLibConfigurator(); - assertNotNull(deamonConfig.getExecutorService()); - assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); - assertNotNull(deamonConfig.getRecordProcessorFactory()); } @Test - public void propertyValidation() { + public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException { + setup("", ""); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameAndArnAreNull() { + setup(null, null); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() { + setup(null, ""); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() { + setup("", null); + + assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator)); + } + + @Test + public void testConstructorUsingStreamName() throws IOException { + setup(TestStreamName, null); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); + } + + @Test + public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException { + setup(TestStreamName, ""); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); + } + + @Test + public void testConstructorUsingStreamArn() throws IOException { + setup(null, TestStreamArn); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); + } + + @Test + public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException { + setup("", TestStreamArn); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); + } + + @Test + public void testConstructorUsingStreamArnOverStreamName() throws IOException { + setup(TestStreamName, TestStreamArn); + + MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + + AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); + } + + /** + * Verify the daemonConfig properties are what we expect them to be. + * @param deamonConfig + * @param expectedStreamName + */ + private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig, + String expectedExe, + String expectedApplicationName, + String expectedStreamName){ + assertNotNull(deamonConfig.getExecutorService()); + assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); + assertNotNull(deamonConfig.getRecordProcessorFactory()); + + assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]); + assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName()); + assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName()); + } + + @Test + public void testPropertyValidation() { String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 031fc427..5a484bfc 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -306,12 +306,33 @@ public class KinesisClientLibConfiguratorTest { } @Test - public void testWithMissingStreamName() { + public void testWithMissingStreamNameAndMissingStreamArn() { thrown.expect(NullPointerException.class); - thrown.expectMessage("Stream name is required"); + thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); - String test = StringUtils.join(new String[] { "applicationName = b", - "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100" }, + '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + configurator.getConfiguration(input); + } + @Test + public void testWithEmptyStreamNameAndMissingStreamArn() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)"); + + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100", + "streamName = ", + "streamArn = "}, + '\n'); InputStream input = new ByteArrayInputStream(test.getBytes()); configurator.getConfiguration(input);