Update multilang interfaces for StreamARN changes

This commit is contained in:
Yu Zeng 2022-12-08 09:49:01 -08:00
parent 6267e66c71
commit f9ab11326e
7 changed files with 45 additions and 15 deletions

1
.gitignore vendored
View file

@ -1,5 +1,6 @@
target/ target/
AwsCredentials.properties AwsCredentials.properties
.idea .idea
.DS_Store
*.iml *.iml

View file

@ -210,6 +210,7 @@ public class KinesisClientLibConfiguration {
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
private String streamARN;
private String kinesisEndpoint; private String kinesisEndpoint;
private String dynamoDBEndpoint; private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream; private InitialPositionInStream initialPositionInStream;

View file

@ -25,10 +25,11 @@ import org.apache.commons.beanutils.ConvertUtilsBean;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.kinesis.common.StreamARNUtil;
/** /**
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
* three properties must be provided. 1) "applicationName" 2) "streamName" 3) "AWSCredentialsProvider" * three properties must be provided. 1) "applicationName" 2) "streamName" or "streamARN" 3) "AWSCredentialsProvider"
* KinesisClientLibConfigurator will help to automatically assign the value of "workerId" if this property is not * KinesisClientLibConfigurator will help to automatically assign the value of "workerId" if this property is not
* provided. In the specified properties file, any properties, which matches the variable name in * provided. In the specified properties file, any properties, which matches the variable name in
* KinesisClientLibConfiguration and has a corresponding "with{variableName}" setter method, will be read in, and its * KinesisClientLibConfiguration and has a corresponding "with{variableName}" setter method, will be read in, and its
@ -69,8 +70,18 @@ public class KinesisClientLibConfigurator {
}); });
Validate.notBlank(configuration.getApplicationName(), "Application name is required"); Validate.notBlank(configuration.getApplicationName(), "Application name is required");
Validate.notBlank(configuration.getStreamName(), "Stream name is required"); Validate.isTrue(StringUtils.isNotBlank(configuration.getStreamName()) || StringUtils.isNotBlank(configuration.getStreamARN()),
"Either Stream name or stream arn needs to be provided");
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
// Extract streamName from streamARN
if (StringUtils.isBlank(configuration.getStreamName()) && StringUtils.isNotBlank(configuration.getStreamARN())) {
configuration.setStreamName(StreamARNUtil.getStreamName(configuration.getStreamARN()));
}
log.debug("StreamName is set to " + configuration.getStreamName());
log.debug("StreamARN is set to " + configuration.getStreamARN());
return configuration; return configuration;
} }

View file

@ -23,12 +23,12 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter; import org.apache.commons.beanutils.converters.ArrayConverter;
@ -74,6 +74,8 @@ public class MultiLangDaemonConfiguration {
private String streamName; private String streamName;
private String streamARN;
@ConfigurationSettable(configurationClass = ConfigsBuilder.class) @ConfigurationSettable(configurationClass = ConfigsBuilder.class)
private String tableName; private String tableName;
@ -371,7 +373,10 @@ public class MultiLangDaemonConfiguration {
DynamoDbAsyncClient dynamoDbAsyncClient = dynamoDbClient.build(DynamoDbAsyncClient.class); DynamoDbAsyncClient dynamoDbAsyncClient = dynamoDbClient.build(DynamoDbAsyncClient.class);
CloudWatchAsyncClient cloudWatchAsyncClient = cloudWatchClient.build(CloudWatchAsyncClient.class); CloudWatchAsyncClient cloudWatchAsyncClient = cloudWatchClient.build(CloudWatchAsyncClient.class);
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisAsyncClient, // We've verified that streamName and StreamARN cannot both be missing
// Note that StreamARN takes precedence if both are present
String streamNameOrARN = Optional.ofNullable(streamARN).orElse(streamName);
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamNameOrARN, applicationName, kinesisAsyncClient,
dynamoDbAsyncClient, cloudWatchAsyncClient, workerIdentifier, shardRecordProcessorFactory); dynamoDbAsyncClient, cloudWatchAsyncClient, workerIdentifier, shardRecordProcessorFactory);
Map<Class<?>, Object> configObjects = new HashMap<>(); Map<Class<?>, Object> configObjects = new HashMap<>();

View file

@ -306,9 +306,9 @@ public class KinesisClientLibConfiguratorTest {
} }
@Test @Test
public void testWithMissingStreamName() { public void testWithStreamNameAndARNBothMissing() {
thrown.expect(NullPointerException.class); thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Stream name is required"); thrown.expectMessage("Either Stream name or stream arn needs to be provided");
String test = StringUtils.join(new String[] { "applicationName = b", String test = StringUtils.join(new String[] { "applicationName = b",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n');
@ -317,6 +317,18 @@ public class KinesisClientLibConfiguratorTest {
configurator.getConfiguration(input); configurator.getConfiguration(input);
} }
@Test
public void testWithStreamArnAndStreamNameBothPresent() {
String test = StringUtils.join(new String[] { "applicationName = b",
"streamName = streamName", "streamARN = streamARN",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
assertTrue(StringUtils.isNotBlank(config.getStreamARN()));
assertTrue(StringUtils.isNotBlank(config.getStreamName()));
}
@Test @Test
public void testWithMissingApplicationName() { public void testWithMissingApplicationName() {
thrown.expect(NullPointerException.class); thrown.expect(NullPointerException.class);

View file

@ -14,9 +14,9 @@ import java.util.regex.Pattern;
@Slf4j @Slf4j
public class StreamARNUtil { public class StreamARNUtil {
public static Pattern STREAM_ARN_PATTERN = Pattern.compile( public static Pattern STREAM_ARN_PATTERN = Pattern.compile(
"^arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)$"); "arn:aws.*:kinesis:.*:\\d{12}:stream\\/(\\S+)");
public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile( public static Pattern CONSUMER_ARN_PATTERN = Pattern.compile(
"^(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+"); "(arn:aws.*:kinesis:.*:\\d{12}:.*stream\\/[a-zA-Z0-9_.-]+)\\/consumer\\/[a-zA-Z0-9_.-]+:[0-9]+");
public static String getStreamName(String streamNameOrARN) { public static String getStreamName(String streamNameOrARN) {
final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN); final Matcher matcher = STREAM_ARN_PATTERN.matcher(streamNameOrARN);
@ -76,7 +76,7 @@ public class StreamARNUtil {
"Successfully set streamARN to " + streamIdentifier.streamARN().get() : "Successfully set streamARN to " + streamIdentifier.streamARN().get() :
"Not able to set streamARN via DescribeStreamSummary call"); "Not able to set streamARN via DescribeStreamSummary call");
} else { } else {
log.debug("StreamARN " + optionalStreamARN.get() + " is passed during initialization."); log.debug("StreamARN " + optionalStreamARN.get() + " is already passed during initialization.");
} }
} }
} }

View file

@ -102,12 +102,12 @@ public class StreamIdentifier {
} }
/** /**
* Create a single stream instance for StreamIdentifier from stream name. * Create a single stream instance for StreamIdentifier from stream name or stream arn.
* @param streamName * @param streamNameOrARN
* @return StreamIdentifier * @return StreamIdentifier
*/ */
public static StreamIdentifier singleStreamInstance(String streamName) { public static StreamIdentifier singleStreamInstance(String streamNameOrARN) {
Validate.notEmpty(streamName, "StreamName should not be empty"); Validate.notEmpty(streamNameOrARN, "StreamName should not be empty");
return new StreamIdentifier(streamName); return new StreamIdentifier(streamNameOrARN);
} }
} }