Updated multilang daemon to do more validation on stream Arn and added unit tests
This commit is contained in:
parent
1ec4e1de6b
commit
bb51206d5d
4 changed files with 72 additions and 29 deletions
|
|
@ -57,18 +57,14 @@ public class KinesisClientLibConfigurator {
|
||||||
* Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and
|
* Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and
|
||||||
* continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not
|
* continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not
|
||||||
* match any of the variables in KinesisClientLibConfigration.
|
* match any of the variables in KinesisClientLibConfigration.
|
||||||
*
|
*
|
||||||
* @param properties a Properties object containing the configuration information
|
* @param properties a Properties object containing the configuration information
|
||||||
* @return KinesisClientLibConfiguration
|
* @return KinesisClientLibConfiguration
|
||||||
*/
|
*/
|
||||||
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
public MultiLangDaemonConfiguration getConfiguration(Properties properties) {
|
||||||
properties.entrySet().forEach(e -> {
|
properties.entrySet().forEach(e -> {
|
||||||
try {
|
try {
|
||||||
Object value = e.getValue();
|
utilsBean.setProperty(configuration, (String) e.getKey(), e.getValue());
|
||||||
if(value instanceof String){
|
|
||||||
value = ((String) value).trim();
|
|
||||||
}
|
|
||||||
utilsBean.setProperty(configuration, (String) e.getKey(), value);
|
|
||||||
} catch (IllegalAccessException | InvocationTargetException ex) {
|
} catch (IllegalAccessException | InvocationTargetException ex) {
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
|
|
@ -76,21 +72,28 @@ public class KinesisClientLibConfigurator {
|
||||||
|
|
||||||
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
||||||
|
|
||||||
try {
|
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
|
||||||
Validate.notBlank(configuration.getStreamArn(), "");
|
|
||||||
Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
|
Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
|
||||||
|
if(!streamArnObj.getResource().getResourceType().equalsIgnoreCase("stream")){
|
||||||
|
throw new IllegalArgumentException(String.format("StreamArn has unsupported resource type of '%s'. Expected: stream",
|
||||||
|
streamArnObj.getResource().getResourceType()));
|
||||||
|
}
|
||||||
|
if(!streamArnObj.getService().equalsIgnoreCase("kinesis")){
|
||||||
|
throw new IllegalArgumentException(String.format("StreamArn has unsupported service type of '%s'. Expected: kinesis",
|
||||||
|
streamArnObj.getResource().getResourceType()));
|
||||||
|
}
|
||||||
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
||||||
String streamNameFromArn = streamArnObj.getResource().getResource();
|
String streamNameFromArn = streamArnObj.getResource().getResource();
|
||||||
configuration.setStreamName(streamNameFromArn);
|
configuration.setStreamName(streamNameFromArn);
|
||||||
|
|
||||||
//Parse out the region from the Arn and set (and/or override existing value for region)
|
//Parse out the region from the Arn and set (and/or override existing value for region)
|
||||||
String regionName = streamArnObj.getRegion();
|
Region regionObj = Region.of(streamArnObj.getRegion());
|
||||||
Region regionObj = Region.of(regionName);
|
if(Region.regions().stream().filter(x -> x.id().equalsIgnoreCase(regionObj.id())).count() == 0){
|
||||||
|
throw new IllegalArgumentException(String.format("%s is not a valid region", regionObj.id()));
|
||||||
|
}
|
||||||
configuration.setRegionName(regionObj);
|
configuration.setRegionName(regionObj);
|
||||||
}catch (Exception e) {
|
} else {
|
||||||
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
}
|
}
|
||||||
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
||||||
return configuration;
|
return configuration;
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,6 @@ import java.util.UUID;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.commons.beanutils.BeanUtilsBean;
|
import org.apache.commons.beanutils.BeanUtilsBean;
|
||||||
import org.apache.commons.beanutils.ConvertUtils;
|
|
||||||
import org.apache.commons.beanutils.ConvertUtilsBean;
|
import org.apache.commons.beanutils.ConvertUtilsBean;
|
||||||
import org.apache.commons.beanutils.Converter;
|
import org.apache.commons.beanutils.Converter;
|
||||||
import org.apache.commons.beanutils.converters.ArrayConverter;
|
import org.apache.commons.beanutils.converters.ArrayConverter;
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import org.mockito.runners.MockitoJUnitRunner;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
|
||||||
import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
|
import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
|
@ -46,6 +47,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
private static String getTestStreamArn(){
|
private static String getTestStreamArn(){
|
||||||
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn);
|
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
ClassLoader classLoader;
|
ClassLoader classLoader;
|
||||||
|
|
||||||
|
|
@ -58,20 +60,20 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
public void setup(String streamName, String streamArn) {
|
public void setup(String streamName, String streamArn) {
|
||||||
|
|
||||||
String PROPERTIES = String.format("executableName = %s \n"
|
String PROPERTIES = String.format("executableName = %s\n"
|
||||||
+ "applicationName = %s \n"
|
+ "applicationName = %s\n"
|
||||||
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
||||||
+ "processingLanguage = malbolge\n"
|
+ "processingLanguage = malbolge\n"
|
||||||
+ "regionName = %s \n",
|
+ "regionName = %s\n",
|
||||||
TestExe,
|
TestExe,
|
||||||
TestApplicationName,
|
TestApplicationName,
|
||||||
TestRegion);
|
TestRegion);
|
||||||
|
|
||||||
if(streamName != null){
|
if(streamName != null){
|
||||||
PROPERTIES += String.format("streamName = %s \n", streamName);
|
PROPERTIES += String.format("streamName = %s\n", streamName);
|
||||||
}
|
}
|
||||||
if(streamArn != null){
|
if(streamArn != null){
|
||||||
PROPERTIES += String.format("streamArn = %s \n", streamArn);
|
PROPERTIES += String.format("streamArn = %s\n", streamArn);
|
||||||
}
|
}
|
||||||
classLoader = Mockito.mock(ClassLoader.class);
|
classLoader = Mockito.mock(ClassLoader.class);
|
||||||
|
|
||||||
|
|
@ -84,6 +86,34 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamArnIsInvalid() throws IOException {
|
||||||
|
setup("", "this_is_not_a_valid_arn");
|
||||||
|
|
||||||
|
assertThrows(Exception.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamArnHasInvalidRegion() throws IOException {
|
||||||
|
setup("", "arn:aws:kinesis:us-east-1000:ACCOUNT_ID:stream/streamName");
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamArnHasInvalidResourceType() throws IOException {
|
||||||
|
setup("", "arn:aws:kinesis:us-EAST-1:ACCOUNT_ID:dynamodb/streamName");
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorFailsBecauseStreamArnHasInvalidService() throws IOException {
|
||||||
|
setup("", "arn:aws:kinesisFakeService:us-east-1:ACCOUNT_ID:stream/streamName");
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> new MultiLangDaemonConfig(FILENAME, classLoader, configurator));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException {
|
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws IOException {
|
||||||
setup("", "");
|
setup("", "");
|
||||||
|
|
@ -118,7 +148,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -127,7 +157,16 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion);
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
|
||||||
|
setup(TestStreamName, " ");
|
||||||
|
|
||||||
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -136,7 +175,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -145,7 +184,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -154,7 +193,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
||||||
|
|
||||||
AssertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn);
|
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -162,11 +201,12 @@ public class MultiLangDaemonConfigTest {
|
||||||
* @param deamonConfig
|
* @param deamonConfig
|
||||||
* @param expectedStreamName
|
* @param expectedStreamName
|
||||||
*/
|
*/
|
||||||
private void AssertConfigurationsMatch(MultiLangDaemonConfig deamonConfig,
|
private void assertConfigurationsMatch(MultiLangDaemonConfig deamonConfig,
|
||||||
String expectedExe,
|
String expectedExe,
|
||||||
String expectedApplicationName,
|
String expectedApplicationName,
|
||||||
String expectedStreamName,
|
String expectedStreamName,
|
||||||
String expectedRegionName){
|
String expectedRegionName,
|
||||||
|
String expectedStreamArn){
|
||||||
assertNotNull(deamonConfig.getExecutorService());
|
assertNotNull(deamonConfig.getExecutorService());
|
||||||
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
|
assertNotNull(deamonConfig.getMultiLangDaemonConfiguration());
|
||||||
assertNotNull(deamonConfig.getRecordProcessorFactory());
|
assertNotNull(deamonConfig.getRecordProcessorFactory());
|
||||||
|
|
@ -177,6 +217,7 @@ public class MultiLangDaemonConfigTest {
|
||||||
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString());
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getDynamoDbClient().get("region").toString());
|
||||||
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString());
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getCloudWatchClient().get("region").toString());
|
||||||
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString());
|
assertEquals(expectedRegionName, deamonConfig.getMultiLangDaemonConfiguration().getKinesisClient().get("region").toString());
|
||||||
|
assertEquals(expectedStreamArn, deamonConfig.getMultiLangDaemonConfiguration().getStreamArn());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -308,7 +308,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWithMissingStreamNameAndMissingStreamArn() {
|
public void testWithMissingStreamNameAndMissingStreamArn() {
|
||||||
thrown.expect(NullPointerException.class);
|
thrown.expect(NullPointerException.class);
|
||||||
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
String test = StringUtils.join(new String[] {
|
String test = StringUtils.join(new String[] {
|
||||||
"applicationName = b",
|
"applicationName = b",
|
||||||
|
|
@ -323,7 +323,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWithEmptyStreamNameAndMissingStreamArn() {
|
public void testWithEmptyStreamNameAndMissingStreamArn() {
|
||||||
thrown.expect(IllegalArgumentException.class);
|
thrown.expect(IllegalArgumentException.class);
|
||||||
thrown.expectMessage("Stream name or Stream Arn is required. (Stream Arn takes precedence if both are passed in)");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
String test = StringUtils.join(new String[] {
|
String test = StringUtils.join(new String[] {
|
||||||
"applicationName = b",
|
"applicationName = b",
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue