Compare commits

...

6 commits

4 changed files with 225 additions and 37 deletions

View file

@ -19,12 +19,14 @@ import java.io.InputStream;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.Properties; import java.util.Properties;
import com.amazonaws.arn.Arn;
import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.regions.Region;
/** /**
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
@ -69,7 +71,30 @@ public class KinesisClientLibConfigurator {
}); });
Validate.notBlank(configuration.getApplicationName(), "Application name is required"); Validate.notBlank(configuration.getApplicationName(), "Application name is required");
Validate.notBlank(configuration.getStreamName(), "Stream name is required");
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
if(!streamArnObj.getResource().getResourceType().equalsIgnoreCase("stream")){
throw new IllegalArgumentException(String.format("StreamArn has unsupported resource type of '%s'. Expected: stream",
streamArnObj.getResource().getResourceType()));
}
if(!streamArnObj.getService().equalsIgnoreCase("kinesis")){
throw new IllegalArgumentException(String.format("StreamArn has unsupported service type of '%s'. Expected: kinesis",
streamArnObj.getResource().getResourceType()));
}
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
String streamNameFromArn = streamArnObj.getResource().getResource();
configuration.setStreamName(streamNameFromArn);
//Parse out the region from the Arn and set (and/or override existing value for region)
Region regionObj = Region.of(streamArnObj.getRegion());
if(Region.regions().stream().filter(x -> x.id().equalsIgnoreCase(regionObj.id())).count() == 0){
throw new IllegalArgumentException(String.format("%s is not a valid region", regionObj.id()));
}
configuration.setRegionName(regionObj);
} else {
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
}
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
return configuration; return configuration;
} }

View file

@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.function.Function; import java.util.function.Function;
import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter; import org.apache.commons.beanutils.converters.ArrayConverter;
@ -73,6 +72,8 @@ public class MultiLangDaemonConfiguration {
private String applicationName; private String applicationName;
private String streamName; private String streamName;
private String streamArn;
@ConfigurationSettable(configurationClass = ConfigsBuilder.class) @ConfigurationSettable(configurationClass = ConfigsBuilder.class)
private String tableName; private String tableName;

View file

@ -14,17 +14,14 @@
*/ */
package software.amazon.kinesis.multilang; package software.amazon.kinesis.multilang;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Properties;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -34,12 +31,25 @@ import org.mockito.runners.MockitoJUnitRunner;
import junit.framework.Assert; import junit.framework.Assert;
import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator; import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class MultiLangDaemonConfigTest { public class MultiLangDaemonConfigTest {
private static String FILENAME = "some.properties"; private static String FILENAME = "some.properties";
private static String TestExe = "TestExe.exe";
private static String TestApplicationName = "TestApp";
private static String TestStreamName = "fakeStream";
private static String TestStreamNameInArn = "FAKE_STREAM_NAME";
private static String TestRegion = "us-east-1";
private static String TestRegionInArn = "us-east-2";
private static String getTestStreamArn(){
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn);
}
@Mock
ClassLoader classLoader;
@Mock @Mock
private AwsCredentialsProvider credentialsProvider; private AwsCredentialsProvider credentialsProvider;
@ -48,39 +58,170 @@ public class MultiLangDaemonConfigTest {
@Mock @Mock
private KinesisClientLibConfigurator configurator; private KinesisClientLibConfigurator configurator;
@Before public void setup(String streamName, String streamArn) {
public void setup() {
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean,
convertUtilsBean);
multiLangDaemonConfiguration.setApplicationName("cool-app");
multiLangDaemonConfiguration.setStreamName("cool-stream");
multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker");
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration);
}
@Test String PROPERTIES = String.format("executableName = %s\n"
public void constructorTest() throws IOException { + "applicationName = %s\n"
String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
+ "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge\n"
+ "processingLanguage = malbolge"; + "regionName = %s\n",
ClassLoader classLoader = Mockito.mock(ClassLoader.class); TestExe,
TestApplicationName,
TestRegion);
if(streamName != null){
PROPERTIES += String.format("streamName = %s\n", streamName);
}
if(streamArn != null){
PROPERTIES += String.format("streamArn = %s\n", streamArn);
}
classLoader = Mockito.mock(ClassLoader.class);
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader) Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
.getResourceAsStream(FILENAME); .getResourceAsStream(FILENAME);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); when(credentialsProvider.resolveCredentials()).thenReturn(creds);
when(creds.accessKeyId()).thenReturn("cool-user");
configurator = new KinesisClientLibConfigurator();
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
assertNotNull(deamonConfig.getRecordProcessorFactory());
} }
@Test @Test
public void propertyValidation() { public void testConstructorFailsBecauseStreamArnIsInvalid() throws IOException {
setup("", "this_is_not_a_valid_arn");
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamArnHasInvalidRegion() throws IOException {
setup("", "arn:aws:kinesis:us-east-1000:ACCOUNT_ID:stream/streamName");
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamArnHasInvalidResourceType() throws IOException {
setup("", "arn:aws:kinesis:us-EAST-1:ACCOUNT_ID:dynamodb/streamName");
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamArnHasInvalidService() throws IOException {
setup("", "arn:aws:kinesisFakeService:us-east-1:ACCOUNT_ID:stream/streamName");
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException {
setup("", "");
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamNameAndArnAreNull() {
setup(null, null);
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() {
setup(null, "");
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() {
setup("", null);
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
}
@Test
public void testConstructorUsingStreamName() throws IOException {
setup(TestStreamName, null);
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, null);
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
setup(TestStreamName, "");
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
}
@Test
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
setup(TestStreamName, " ");
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
}
@Test
public void testConstructorUsingStreamArn() throws IOException {
setup(null, getTestStreamArn());
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
}
@Test
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
setup("", getTestStreamArn());
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
}
@Test
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
setup(TestStreamName, getTestStreamArn());
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
}
/**
* Verify the daemonConfig properties are what we expect them to be.
* @param deamonConfig
* @param expectedStreamName
*/
private void assertConfigurationsMatch(MultiLangDaemonConfig deamonConfig,
String expectedExe,
String expectedApplicationName,
String expectedStreamName,
String expectedRegionName,
String expectedStreamArn){
assertNotNull(deamonConfig.getExecutorService());
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
assertNotNull(deamonConfig.getRecordProcessorFactory());
assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]);
assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName());
assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString());
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString());
assertEquals(expectedStreamArn, deamonConfig.getMultiLangDaemonConfiguration().getStreamArn());
}
@Test
public void testPropertyValidation() {
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
ClassLoader classLoader = Mockito.mock(ClassLoader.class); ClassLoader classLoader = Mockito.mock(ClassLoader.class);

View file

@ -306,12 +306,33 @@ public class KinesisClientLibConfiguratorTest {
} }
@Test @Test
public void testWithMissingStreamName() { public void testWithMissingStreamNameAndMissingStreamArn() {
thrown.expect(NullPointerException.class); thrown.expect(NullPointerException.class);
thrown.expectMessage("Stream name is required"); thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
String test = StringUtils.join(new String[] { "applicationName = b", String test = StringUtils.join(new String[] {
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); "applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100" },
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input);
}
@Test
public void testWithEmptyStreamNameAndMissingStreamArn() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
String test = StringUtils.join(new String[] {
"applicationName = b",
"AWSCredentialsProvider = " + credentialName1,
"workerId = 123",
"failoverTimeMillis = 100",
"streamName = ",
"streamArn = "},
'\n');
InputStream input = new ByteArrayInputStream(test.getBytes()); InputStream input = new ByteArrayInputStream(test.getBytes());
configurator.getConfiguration(input); configurator.getConfiguration(input);