Update to Multilang Daemon to support StreamArn (#1143)

* Updated multilang to support streamArn

* Updated arn import to use software.amzon instead of com.amazonaws, also updated unit tests to be more explicit with the expected exceptions

* Updated exception wording for region validation in StreamArn to be more consistent with other error messages

* reverted spacing change

* Updated StreamArn in multilang to only replace streamName (not region as well). Also updated unit tests and added Region validation

* Updated region validation in multilang to be more readible

* Refactored multilang unit tests to be more simple

* Updated multilang daemon to validate streamArn based on pattern rather than individual section

* removed region validation as this was not a requirement for stringArn support in multilangdaemon

* removed spacing and removed unit test assertion on exception message

* removed unnecessary param from unit test

* removed unused imports from multilang unit tests

* simplified the assertion for multilang daemon unit tests

* Cleaned up unit test code following best practices for spacing/naming conventions and simplied kinesisClientLibConfiguration

* Updated region code in unit tests for multilang daemon

---------

Co-authored-by: Ryan Pelaez <rmpelaez@amazon.com>
This commit is contained in:
pelaezryan 2023-06-26 09:02:19 -07:00 committed by GitHub
parent eb6fd0cf32
commit dcd1c53fb1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 195 additions and 81 deletions

View file

@ -23,8 +23,10 @@ import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.StreamIdentifier;
/**
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
@ -69,8 +71,19 @@ public class KinesisClientLibConfigurator {
});
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
Validate.notBlank(configuration.getStreamName(), "Stream name is required");
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
final Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
StreamIdentifier.validateArn(streamArnObj);
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
final String streamNameFromArn = streamArnObj.resource().resource();
configuration.setStreamName(streamNameFromArn);
}
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
return configuration;
}

View file

@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.function.Function;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter;
@ -73,6 +72,8 @@ public class MultiLangDaemonConfiguration {
private String applicationName;
private String streamName;
private String streamArn;
@ConfigurationSettable(configurationClass = ConfigsBuilder.class)
private String tableName;

View file

@ -28,7 +28,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import software.amazon.kinesis.multilang.MessageReader;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -32,15 +32,12 @@ import org.mockito.Mockito;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.MessageWriter;
import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
import software.amazon.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import static org.mockito.Mockito.verify;

View file

@ -14,17 +14,14 @@
*/
package software.amazon.kinesis.multilang;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.Before;
import software.amazon.awssdk.regions.Region;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -39,48 +36,154 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
@RunWith(MockitoJUnitRunner.class)
public class MultiLangDaemonConfigTest {
private static String FILENAME = "some.properties";
private static final String FILENAME = "some.properties";
private static final String EXE = "TestExe.exe";
private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName();
private static final String STREAM_NAME = "fakeStream";
private static final String STREAM_NAME_IN_ARN = "FAKE_STREAM_NAME";
private static final Region REGION = Region.US_EAST_1;
private static final String STREAM_ARN = "arn:aws:kinesis:us-east-2:012345678987:stream/" + STREAM_NAME_IN_ARN;
@Mock
private ClassLoader classLoader;
@Mock
private AwsCredentialsProvider credentialsProvider;
@Mock
private AwsCredentials creds;
@Mock
private KinesisClientLibConfigurator configurator;
private MultiLangDaemonConfig deamonConfig;
@Before
public void setup() {
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean,
convertUtilsBean);
multiLangDaemonConfiguration.setApplicationName("cool-app");
multiLangDaemonConfiguration.setStreamName("cool-stream");
multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker");
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration);
/**
* Instantiate a MultiLangDaemonConfig object
* @param streamName
* @param streamArn
* @throws IOException
*/
public void setup(String streamName, String streamArn) throws IOException {
String properties = String.format("executableName = %s\n"
+ "applicationName = %s\n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge\n"
+ "regionName = %s\n",
EXE,
APPLICATION_NAME,
"us-east-1");
if (streamName != null) {
properties += String.format("streamName = %s\n", streamName);
}
if (streamArn != null) {
properties += String.format("streamArn = %s\n", streamArn);
}
classLoader = Mockito.mock(ClassLoader.class);
@Test
public void constructorTest() throws IOException {
String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n"
+ "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
configurator = new KinesisClientLibConfigurator();
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
assertNotNull(deamonConfig.getRecordProcessorFactory());
deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamArnIsInvalid() throws Exception {
setup("", "this_is_not_a_valid_arn");
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamArnIsInvalid2() throws Exception {
setup("", "arn:aws:kinesis:us-east-2:ACCOUNT_ID:BadFormatting:stream/" + STREAM_NAME_IN_ARN);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws Exception {
setup("", "");
}
@Test(expected = NullPointerException.class)
public void testConstructorFailsBecauseStreamNameAndArnAreNull() throws Exception {
setup(null, null);
}
@Test(expected = NullPointerException.class)
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() throws Exception {
setup(null, "");
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() throws Exception {
setup("", null);
}
@Test
public void propertyValidation() {
public void testConstructorUsingStreamName() throws IOException {
setup(STREAM_NAME, null);
assertConfigurationsMatch(STREAM_NAME, null);
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
setup(STREAM_NAME, "");
assertConfigurationsMatch(STREAM_NAME, "");
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
setup(STREAM_NAME, " ");
assertConfigurationsMatch(STREAM_NAME, "");
}
@Test
public void testConstructorUsingStreamArn() throws IOException {
setup(null, STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
@Test
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
setup("", STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
@Test
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
setup(STREAM_NAME, STREAM_ARN);
assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN);
}
/**
* Verify the daemonConfig properties are what we expect them to be.
* @param deamonConfig
* @param expectedStreamName
*/
private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) {
final MultiLangDaemonConfiguration multiLangConfiguration = deamonConfig.getMultiLangDaemonConfiguration();
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(multiLangConfiguration);
assertNotNull(deamonConfig.getRecordProcessorFactory());
assertEquals(EXE, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]);
assertEquals(APPLICATION_NAME, multiLangConfiguration.getApplicationName());
assertEquals(expectedStreamName, multiLangConfiguration.getStreamName());
assertEquals(REGION, multiLangConfiguration.getDynamoDbClient().get("region"));
assertEquals(REGION, multiLangConfiguration.getCloudWatchClient().get("region"));
assertEquals(REGION, multiLangConfiguration.getKinesisClient().get("region"));
assertEquals(expectedStreamArn, multiLangConfiguration.getStreamArn());
}
@Test
public void testPropertyValidation() {
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class);

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.multilang;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyObject;

View file

@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -61,10 +60,8 @@ import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import com.google.common.util.concurrent.SettableFuture;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

View file

@ -27,8 +27,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import software.amazon.kinesis.multilang.DrainChildSTDERRTask;
import software.amazon.kinesis.multilang.LineReaderTask;
public class ReadSTDERRTaskTest {

View file

@ -14,12 +14,9 @@
*/
package software.amazon.kinesis.multilang;
import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.kinesis.multilang.MultiLangRecordProcessorFactory;
import software.amazon.kinesis.multilang.MultiLangShardRecordProcessor;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import org.junit.runner.RunWith;

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.multilang.config;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.Arrays;
@ -32,11 +31,6 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
public class AWSCredentialsProviderPropertyValueDecoderTest {
private static final String TEST_ACCESS_KEY_ID = "123";

View file

@ -278,10 +278,8 @@ public class KinesisClientLibConfiguratorTest {
}
}
@Test
@Test(expected = IllegalArgumentException.class)
public void testWithMissingCredentialsProvider() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("A basic set of AWS credentials must be provided");
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123",
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n');
@ -305,22 +303,37 @@ public class KinesisClientLibConfiguratorTest {
assertFalse(config.getWorkerIdentifier().isEmpty());
}
@Test
public void testWithMissingStreamName() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("Stream name is required");
String test = StringUtils.join(new String[] { "applicationName = b",
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n');
@Test(expected = NullPointerException.class)
public void testWithMissingStreamNameAndMissingStreamArn() {
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100" },
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input);
}
@Test
@Test(expected = IllegalArgumentException.class)
public void testWithEmptyStreamNameAndMissingStreamArn() {
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100",
"streamName = ",
"streamArn = "},
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input);
}
@Test(expected = NullPointerException.class)
public void testWithMissingApplicationName() {
thrown.expect(NullPointerException.class);
thrown.expectMessage("Application name is required");
String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1,
"workerId = 123", "failoverTimeMillis = 100" }, '\n');

View file

@ -26,13 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.multilang.messages.ShutdownMessage;
import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
public class MessageTest {

View file

@ -167,12 +167,22 @@ public class StreamIdentifier {
.build();
}
private static void validateArn(Arn streamArn) {
/**
* Verify the streamArn follows the appropriate formatting.
* Throw an exception if it does not.
* @param streamArn
*/
public static void validateArn(Arn streamArn) {
if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches() || !streamArn.region().isPresent()) {
throw new IllegalArgumentException("Unable to create a StreamIdentifier from " + streamArn);
throw new IllegalArgumentException("Invalid streamArn " + streamArn);
}
}
/**
* Verify creationEpoch is greater than 0.
* Throw an exception if it is not.
* @param creationEpoch
*/
private static void validateCreationEpoch(long creationEpoch) {
if (creationEpoch <= 0) {
throw new IllegalArgumentException(