Updated multilang daemon to parse RegionName from StreamArn if passed in properties

This commit is contained in:
Ryan Pelaez 2023-06-12 16:51:38 -07:00
parent 7ba4efb1d1
commit 6a715efb89
3 changed files with 32 additions and 15 deletions

View file

@ -26,6 +26,7 @@ import org.apache.commons.beanutils.ConvertUtilsBean;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.regions.Region;
/** /**
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
@ -76,13 +77,20 @@ public class KinesisClientLibConfigurator {
Validate.notBlank(configuration.getApplicationName(), "Application name is required"); Validate.notBlank(configuration.getApplicationName(), "Application name is required");
try { try {
Validate.notBlank(configuration.getStreamName(), ""); Validate.notBlank(configuration.getStreamArn(), "");
}catch (Exception e) {
Validate.notBlank(configuration.getStreamArn(), "Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)");
Arn streamArnObj = Arn.fromString(configuration.getStreamArn()); Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
String streamNameFromArn = streamArnObj.getResource().getResource(); String streamNameFromArn = streamArnObj.getResource().getResource();
configuration.setStreamName(streamNameFromArn); configuration.setStreamName(streamNameFromArn);
//Parse out the region from the Arn and set (and/or override existing value for region)
String regionName = streamArnObj.getRegion();
Region regionObj = Region.of(regionName);
configuration.setRegionName(regionObj);
}catch (Exception e) {
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
} }
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
return configuration; return configuration;

View file

@ -40,7 +40,10 @@ public class MultiLangDaemonConfigTest {
private static String TestStreamName = "fakeStream"; private static String TestStreamName = "fakeStream";
private static String TestStreamNameInArn = "FAKE_STREAM_NAME"; private static String TestStreamNameInArn = "FAKE_STREAM_NAME";
private static String TestStreamArn = "arn:aws:kinesis:us-east-1:ACCOUNT_ID:stream/FAKE_STREAM_NAME"; private static String TestRegion = "us-east-1";
private static String TestRegionInArn = "us-east-2";
private static String TestStreamArn = "arn:aws:kinesis:us-east-2:ACCOUNT_ID:stream/FAKE_STREAM_NAME";
@Mock @Mock
ClassLoader classLoader; ClassLoader classLoader;
@ -57,9 +60,11 @@ public class MultiLangDaemonConfigTest {
String PROPERTIES = String.format("executableName = %s \n" String PROPERTIES = String.format("executableName = %s \n"
+ "applicationName = %s \n" + "applicationName = %s \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge\n", + "processingLanguage = malbolge\n"
+ "regionName = %s \n",
TestExe, TestExe,
TestApplicationName); TestApplicationName,
TestRegion);
if(streamName != null){ if(streamName != null){
PROPERTIES += String.format("streamName = %s \n", streamName); PROPERTIES += String.format("streamName = %s \n", streamName);
@ -112,7 +117,7 @@ public class MultiLangDaemonConfigTest {
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
} }
@Test @Test
@ -121,7 +126,7 @@ public class MultiLangDaemonConfigTest {
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
} }
@Test @Test
@ -130,7 +135,7 @@ public class MultiLangDaemonConfigTest {
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
} }
@Test @Test
@ -139,16 +144,16 @@ public class MultiLangDaemonConfigTest {
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn); AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
} }
@Test @Test
public void testConstructorUsingStreamNameOverStreamArn() throws IOException { public void testConstructorUsingStreamArnOverStreamName() throws IOException {
setup(TestStreamName, TestStreamArn); setup(TestStreamName, TestStreamArn);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName); AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
} }
/** /**
@ -159,7 +164,8 @@ public class MultiLangDaemonConfigTest {
private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig, private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig,
String expectedExe, String expectedExe,
String expectedApplicationName, String expectedApplicationName,
String expectedStreamName){ String expectedStreamName,
String expectedRegionName){
assertNotNull(deamonConfig.getExecutorService()); assertNotNull(deamonConfig.getExecutorService());
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
assertNotNull(deamonConfig.getRecordProcessorFactory()); assertNotNull(deamonConfig.getRecordProcessorFactory());
@ -167,6 +173,9 @@ public class MultiLangDaemonConfigTest {
assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]); assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]);
assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName()); assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName());
assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName()); assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString());
} }
@Test @Test

View file

@ -308,7 +308,7 @@ public class KinesisClientLibConfiguratorTest {
@Test @Test
public void testWithMissingStreamNameAndMissingStreamArn() { public void testWithMissingStreamNameAndMissingStreamArn() {
thrown.expect(NullPointerException.class); thrown.expect(NullPointerException.class);
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)"); thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
String test = StringUtils.join(new String[] { String test = StringUtils.join(new String[] {
"applicationName = b", "applicationName = b",
@ -323,7 +323,7 @@ public class KinesisClientLibConfiguratorTest {
@Test @Test
public void testWithEmptyStreamNameAndMissingStreamArn() { public void testWithEmptyStreamNameAndMissingStreamArn() {
thrown.expect(IllegalArgumentException.class); thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Name takes precedence if both are passed in)"); thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
String test = StringUtils.join(new String[] { String test = StringUtils.join(new String[] {
"applicationName = b", "applicationName = b",