From f9ab11326e66c3f724e14ad9bf640b785e74ab4a Mon Sep 17 00:00:00 2001 From: Yu Zeng Date: Thu, 8 Dec 2022 09:49:01 -0800 Subject: [PATCH] Update multilang interfaces for StreamARN changes --- .gitignore | 1 + .../KinesisClientLibConfiguration.java | 1 + .../config/KinesisClientLibConfigurator.java | 15 +++++++++++++-- .../config/MultiLangDaemonConfiguration.java | 9 +++++++-- .../KinesisClientLibConfiguratorTest.java | 18 +++++++++++++++--- .../amazon/kinesis/common/StreamARNUtil.java | 6 +++--- .../kinesis/common/StreamIdentifier.java | 10 +++++----- 7 files changed, 45 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 973f5095..bb09bf36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ target/ AwsCredentials.properties .idea +.DS_Store *.iml diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 54797050..f259fdc4 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -210,6 +210,7 @@ public class KinesisClientLibConfiguration { private String applicationName; private String tableName; private String streamName; + private String streamARN; private String kinesisEndpoint; private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index f3facdc0..7d2e6da0 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -25,10 +25,11 @@ import org.apache.commons.beanutils.ConvertUtilsBean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import software.amazon.kinesis.common.StreamARNUtil; /** * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following - * three properties must be provided. 1) "applicationName" 2) "streamName" 3) "AWSCredentialsProvider" + * three properties must be provided. 1) "applicationName" 2) "streamName" or "streamARN" 3) "AWSCredentialsProvider" * KinesisClientLibConfigurator will help to automatically assign the value of "workerId" if this property is not * provided. In the specified properties file, any properties, which matches the variable name in * KinesisClientLibConfiguration and has a corresponding "with{variableName}" setter method, will be read in, and its @@ -69,8 +70,18 @@ public class KinesisClientLibConfigurator { }); Validate.notBlank(configuration.getApplicationName(), "Application name is required"); - Validate.notBlank(configuration.getStreamName(), "Stream name is required"); + Validate.isTrue(StringUtils.isNotBlank(configuration.getStreamName()) || StringUtils.isNotBlank(configuration.getStreamARN()), + "Either Stream name or stream arn needs to be provided"); Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); + + // Extract streamName from streamARN + if (StringUtils.isBlank(configuration.getStreamName()) && StringUtils.isNotBlank(configuration.getStreamARN())) { + configuration.setStreamName(StreamARNUtil.getStreamName(configuration.getStreamARN())); + } + + log.debug("StreamName is set to " + configuration.getStreamName()); + log.debug("StreamARN is set to " + configuration.getStreamARN()); + return configuration; } diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index da280ddf..9cb027df 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -23,12 +23,12 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -74,6 +74,8 @@ public class MultiLangDaemonConfiguration { private String streamName; + private String streamARN; + @ConfigurationSettable(configurationClass = ConfigsBuilder.class) private String tableName; @@ -371,7 +373,10 @@ public class MultiLangDaemonConfiguration { DynamoDbAsyncClient dynamoDbAsyncClient = dynamoDbClient.build(DynamoDbAsyncClient.class); CloudWatchAsyncClient cloudWatchAsyncClient = cloudWatchClient.build(CloudWatchAsyncClient.class); - ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisAsyncClient, + // We've verified that streamName and StreamARN cannot both be missing + // Note that StreamARN takes precedence if both are present + String streamNameOrARN = Optional.ofNullable(streamARN).orElse(streamName); + ConfigsBuilder configsBuilder = new ConfigsBuilder(streamNameOrARN, applicationName, kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, workerIdentifier, shardRecordProcessorFactory); Map, Object> configObjects = new HashMap<>(); diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 031fc427..614ccc06 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -306,9 +306,9 @@ public class KinesisClientLibConfiguratorTest { } @Test - public void testWithMissingStreamName() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("Stream name is required"); + public void testWithStreamNameAndARNBothMissing() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Either Stream name or stream arn needs to be provided"); String test = StringUtils.join(new String[] { "applicationName = b", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); @@ -317,6 +317,18 @@ public class KinesisClientLibConfiguratorTest { configurator.getConfiguration(input); } + @Test + public void testWithStreamArnAndStreamNameBothPresent() { + String test = StringUtils.join(new String[] { "applicationName = b", + "streamName = streamName", "streamARN = streamARN", + "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + MultiLangDaemonConfiguration config = configurator.getConfiguration(input); + assertTrue(StringUtils.isNotBlank(config.getStreamARN())); + assertTrue(StringUtils.isNotBlank(config.getStreamName())); + } + @Test public void testWithMissingApplicationName() { thrown.expect(NullPointerException.class); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java index cb864cee..0abbad24 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamARNUtil.java @@ -14,9 +14,9 @@ import java.util.regex.Pattern; @Slf4j public class StreamARNUtil { public static Pattern STREAM_ARN_PATTERN = Pattern.compile( - "^arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)$"); + "arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)"); public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile( - "^(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+"); + "(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+"); public static String getStreamName(String streamNameOrARN) { final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN); @@ -76,7 +76,7 @@ public class StreamARNUtil { "Successfully set streamARN to " + streamIdentifier.streamARN().get() : "Not able to set streamARN via DescribeStreamSummary call"); } else { - log.debug("StreamARN " + optionalStreamARN.get() + " is passed during initialization."); + log.debug("StreamARN " + optionalStreamARN.get() + " is already passed during initialization."); } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 06d97145..a4823264 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -102,12 +102,12 @@ public class StreamIdentifier { } /** - * Create a single stream instance for StreamIdentifier from stream name. - * @param streamName + * Create a single stream instance for StreamIdentifier from stream name or stream arn. + * @param streamNameOrARN * @return StreamIdentifier */ - public static StreamIdentifier singleStreamInstance(String streamName) { - Validate.notEmpty(streamName, "StreamName should not be empty"); - return new StreamIdentifier(streamName); + public static StreamIdentifier singleStreamInstance(String streamNameOrARN) { + Validate.notEmpty(streamNameOrARN, "StreamName should not be empty"); + return new StreamIdentifier(streamNameOrARN); } }