Compare commits
4 commits
master
...
multilang-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba15dfb98e | ||
|
|
6a715efb89 | ||
|
|
7ba4efb1d1 | ||
|
|
3d71e0868e |
4 changed files with 180 additions and 36 deletions
|
|
@ -19,12 +19,14 @@ import java.io.InputStream;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import com.amazonaws.arn.Arn;
|
||||||
import org.apache.commons.beanutils.BeanUtilsBean;
|
import org.apache.commons.beanutils.BeanUtilsBean;
|
||||||
import org.apache.commons.beanutils.ConvertUtilsBean;
|
import org.apache.commons.beanutils.ConvertUtilsBean;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
import software.amazon.awssdk.regions.Region;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
|
* KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following
|
||||||
|
|
@ -62,14 +64,34 @@ public class KinesisClientLibConfigurator {
|
||||||
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
||||||
properties.entrySet().forEach(e -> {
|
properties.entrySet().forEach(e -> {
|
||||||
try {
|
try {
|
||||||
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
|
Object value = e.getValue();
|
||||||
|
if(value instanceof String){
|
||||||
|
value = ((String) value).trim();
|
||||||
|
}
|
||||||
|
utilsBean.setProperty(configuration, (String) e.getKey(), value);
|
||||||
} catch (IllegalAccessException | InvocationTargetException ex) {
|
} catch (IllegalAccessException | InvocationTargetException ex) {
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
||||||
Validate.notBlank(configuration.getStreamName(), "Stream name is required");
|
|
||||||
|
try {
|
||||||
|
Validate.notBlank(configuration.getStreamArn(), "");
|
||||||
|
Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
|
||||||
|
|
||||||
|
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
||||||
|
String streamNameFromArn = streamArnObj.getResource().getResource();
|
||||||
|
configuration.setStreamName(streamNameFromArn);
|
||||||
|
|
||||||
|
//Parse out the region from the Arn and set (and/or override existing value for region)
|
||||||
|
String regionName = streamArnObj.getRegion();
|
||||||
|
Region regionObj = Region.of(regionName);
|
||||||
|
configuration.setRegionName(regionObj);
|
||||||
|
}catch (Exception e) {
|
||||||
|
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
||||||
|
|
||||||
|
}
|
||||||
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -73,6 +73,8 @@ public class MultiLangDaemonConfiguration {
|
||||||
private String applicationName;
|
private String applicationName;
|
||||||
|
|
||||||
private String streamName;
|
private String streamName;
|
||||||
|
private String streamArn;
|
||||||
|
|
||||||
|
|
||||||
@ConfigurationSettable(configurationClass = ConfigsBuilder.class)
|
@ConfigurationSettable(configurationClass = ConfigsBuilder.class)
|
||||||
private String tableName;
|
private String tableName;
|
||||||
|
|
|
||||||
|
|
@ -14,17 +14,13 @@
|
||||||
*/
|
*/
|
||||||
package software.amazon.kinesis.multilang;
|
package software.amazon.kinesis.multilang;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import org.apache.commons.beanutils.BeanUtilsBean;
|
|
||||||
import org.apache.commons.beanutils.ConvertUtilsBean;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
|
|
@ -35,11 +31,22 @@ import junit.framework.Assert;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
|
import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
|
||||||
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
|
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class MultiLangDaemonConfigTest {
|
public class MultiLangDaemonConfigTest {
|
||||||
private static String FILENAME = "some.properties";
|
private static String FILENAME = "some.properties";
|
||||||
|
private static String TestExe = "TestExe.exe";
|
||||||
|
private static String TestApplicationName = "TestApp";
|
||||||
|
private static String TestStreamName =ßß "fakeStream";
|
||||||
|
private static String TestStreamNameInArn = "FAKE_STREAM_NAME";
|
||||||
|
private static String TestRegion = "us-east-1";
|
||||||
|
private static String TestRegionInArn = "us-east-2";
|
||||||
|
|
||||||
|
private static String getTestStreamArn(){
|
||||||
|
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn);
|
||||||
|
}
|
||||||
|
@Mock
|
||||||
|
ClassLoader classLoader;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private AwsCredentialsProvider credentialsProvider;
|
private AwsCredentialsProvider credentialsProvider;
|
||||||
|
|
@ -48,39 +55,131 @@ public class MultiLangDaemonConfigTest {
|
||||||
@Mock
|
@Mock
|
||||||
private KinesisClientLibConfigurator configurator;
|
private KinesisClientLibConfigurator configurator;
|
||||||
|
|
||||||
@Before
|
public void setup(String streamName, String streamArn) {
|
||||||
public void setup() {
|
|
||||||
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
|
|
||||||
BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean);
|
|
||||||
MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean,
|
|
||||||
convertUtilsBean);
|
|
||||||
multiLangDaemonConfiguration.setApplicationName("cool-app");
|
|
||||||
multiLangDaemonConfiguration.setStreamName("cool-stream");
|
|
||||||
multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker");
|
|
||||||
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
|
||||||
when(creds.accessKeyId()).thenReturn("cool-user");
|
|
||||||
when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
String PROPERTIES = String.format("executableName = %s \n"
|
||||||
public void constructorTest() throws IOException {
|
+ "applicationName = %s \n"
|
||||||
String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n"
|
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
||||||
+ "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
+ "processingLanguage = malbolge\n"
|
||||||
+ "processingLanguage = malbolge";
|
+ "regionName = %s \n",
|
||||||
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
|
TestExe,
|
||||||
|
TestApplicationName,
|
||||||
|
TestRegion);
|
||||||
|
|
||||||
|
if(streamName != null){
|
||||||
|
PROPERTIES += String.format("streamName = %s \n", streamName);
|
||||||
|
}
|
||||||
|
if(streamArn != null){
|
||||||
|
PROPERTIES += String.format("streamArn = %s \n", streamArn);
|
||||||
|
}
|
||||||
|
classLoader = Mockito.mock(ClassLoader.class);
|
||||||
|
|
||||||
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
|
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
|
||||||
.getResourceAsStream(FILENAME);
|
.getResourceAsStream(FILENAME);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
||||||
|
when(creds.accessKeyId()).thenReturn("cool-user");
|
||||||
|
configurator = new KinesisClientLibConfigurator();
|
||||||
|
|
||||||
assertNotNull(deamonConfig.getExecutorService());
|
|
||||||
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
|
|
||||||
assertNotNull(deamonConfig.getRecordProcessorFactory());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void propertyValidation() {
|
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException {
|
||||||
|
setup("", "");
|
||||||
|
|
||||||
|
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamNameAndArnAreNull() {
|
||||||
|
setup(null, null);
|
||||||
|
|
||||||
|
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() {
|
||||||
|
setup(null, "");
|
||||||
|
|
||||||
|
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() {
|
||||||
|
setup("", null);
|
||||||
|
|
||||||
|
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamName() throws IOException {
|
||||||
|
setup(TestStreamName, null);
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
|
||||||
|
setup(TestStreamName, "");
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamArn() throws IOException {
|
||||||
|
setup(null, getTestStreamArn());
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
|
||||||
|
setup("", getTestStreamArn());
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
|
||||||
|
setup(TestStreamName, getTestStreamArn());
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify the daemonConfig properties are what we expect them to be.
|
||||||
|
* @param deamonConfig
|
||||||
|
* @param expectedStreamName
|
||||||
|
*/
|
||||||
|
private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig,
|
||||||
|
String expectedExe,
|
||||||
|
String expectedApplicationName,
|
||||||
|
String expectedStreamName,
|
||||||
|
String expectedRegionName){
|
||||||
|
assertNotNull(deamonConfig.getExecutorService());
|
||||||
|
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
|
||||||
|
assertNotNull(deamonConfig.getRecordProcessorFactory());
|
||||||
|
|
||||||
|
assertEquals(expectedExe, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]);
|
||||||
|
assertEquals(expectedApplicationName, deamonConfig.getMultiLangDaemonConfiguration().getApplicationName());
|
||||||
|
assertEquals(expectedStreamName, deamonConfig.getMultiLangDaemonConfiguration().getStreamName());
|
||||||
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString());
|
||||||
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString());
|
||||||
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPropertyValidation() {
|
||||||
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
|
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
|
||||||
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
|
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge";
|
||||||
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
|
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
|
||||||
|
|
|
||||||
|
|
@ -306,12 +306,33 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithMissingStreamName() {
|
public void testWithMissingStreamNameAndMissingStreamArn() {
|
||||||
thrown.expect(NullPointerException.class);
|
thrown.expect(NullPointerException.class);
|
||||||
thrown.expectMessage("Stream name is required");
|
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
||||||
|
|
||||||
String test = StringUtils.join(new String[] { "applicationName = b",
|
String test = StringUtils.join(new String[] {
|
||||||
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n');
|
"applicationName = b",
|
||||||
|
"AWSCredentialsProvider = " + credentialName1,
|
||||||
|
"workerId = 123",
|
||||||
|
"failoverTimeMillis = 100" },
|
||||||
|
'\n');
|
||||||
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
|
configurator.getConfiguration(input);
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testWithEmptyStreamNameAndMissingStreamArn() {
|
||||||
|
thrown.expect(IllegalArgumentException.class);
|
||||||
|
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
||||||
|
|
||||||
|
String test = StringUtils.join(new String[] {
|
||||||
|
"applicationName = b",
|
||||||
|
"AWSCredentialsProvider = " + credentialName1,
|
||||||
|
"workerId = 123",
|
||||||
|
"failoverTimeMillis = 100",
|
||||||
|
"streamName = ",
|
||||||
|
"streamArn = "},
|
||||||
|
'\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
configurator.getConfiguration(input);
|
configurator.getConfiguration(input);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue