Updated StreamArn in multilang to only replace streamName (not region as well). Also updated unit tests and added Region validation
This commit is contained in:
parent
fb84a35e9e
commit
928a102ade
3 changed files with 105 additions and 102 deletions
|
|
@ -72,29 +72,35 @@ public class KinesisClientLibConfigurator {
|
||||||
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
Validate.notBlank(configuration.getApplicationName(), "Application name is required");
|
||||||
|
|
||||||
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
|
if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) {
|
||||||
Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
|
final Arn streamArnObj = Arn.fromString(configuration.getStreamArn());
|
||||||
if(!streamArnObj.resource().resourceType().get().equalsIgnoreCase("stream")){
|
|
||||||
|
final String resourceType = streamArnObj.resource().resourceType().get();
|
||||||
|
if (!"stream".equalsIgnoreCase(resourceType)) {
|
||||||
throw new IllegalArgumentException(String.format("StreamArn has unsupported resource type of '%s'. Expected: stream",
|
throw new IllegalArgumentException(String.format("StreamArn has unsupported resource type of '%s'. Expected: stream",
|
||||||
streamArnObj.resource().resourceType().get()));
|
resourceType));
|
||||||
}
|
}
|
||||||
if(!streamArnObj.service().equalsIgnoreCase("kinesis")){
|
final String arnService = streamArnObj.service();
|
||||||
|
if (!"kinesis".equalsIgnoreCase(arnService)) {
|
||||||
throw new IllegalArgumentException(String.format("StreamArn has unsupported service type of '%s'. Expected: kinesis",
|
throw new IllegalArgumentException(String.format("StreamArn has unsupported service type of '%s'. Expected: kinesis",
|
||||||
streamArnObj.service()));
|
arnService));
|
||||||
}
|
}
|
||||||
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
//Parse out the stream Name from the Arn (and/or override existing value for Stream Name)
|
||||||
String streamNameFromArn = streamArnObj.resource().resource();
|
final String streamNameFromArn = streamArnObj.resource().resource();
|
||||||
configuration.setStreamName(streamNameFromArn);
|
configuration.setStreamName(streamNameFromArn);
|
||||||
|
|
||||||
//Parse out the region from the Arn and set (and/or override existing value for region)
|
|
||||||
Region regionObj = Region.of(streamArnObj.region().get());
|
|
||||||
if(Region.regions().stream().filter(x -> x.id().equalsIgnoreCase(regionObj.id())).count() == 0){
|
|
||||||
throw new IllegalArgumentException(String.format("StreamArn has unsupported region of '%s'.", regionObj.id()));
|
|
||||||
}
|
|
||||||
configuration.setRegionName(regionObj);
|
|
||||||
} else {
|
} else {
|
||||||
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
}
|
}
|
||||||
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided");
|
||||||
|
|
||||||
|
//Verify Region is real
|
||||||
|
if(configuration.getKinesisClient().get("region") == null){
|
||||||
|
throw new NullPointerException("regionName must be passed in");
|
||||||
|
}
|
||||||
|
final String regionCode = configuration.getKinesisClient().get("region").toString();
|
||||||
|
if (Region.regions().stream().filter(x -> x.id().equalsIgnoreCase(regionCode)).count() == 0) {
|
||||||
|
throw new IllegalArgumentException(String.format("Unsupported region: %s", regionCode));
|
||||||
|
}
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,16 +36,16 @@ import software.amazon.kinesis.multilang.config.KinesisClientLibConfigurator;
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class MultiLangDaemonConfigTest {
|
public class MultiLangDaemonConfigTest {
|
||||||
private static String FILENAME = "some.properties";
|
private static final String TEST_FILENAME = "some.properties";
|
||||||
private static String TestExe = "TestExe.exe";
|
private static final String TEST_EXE = "TestExe.exe";
|
||||||
private static String TestApplicationName = "TestApp";
|
private static final String TEST_APPLICATION_NAME = "TestApp";
|
||||||
private static String TestStreamName = "fakeStream";
|
private static final String TEST_STREAM_NAME = "fakeStream";
|
||||||
private static String TestStreamNameInArn = "FAKE_STREAM_NAME";
|
private static final String TEST_STREAM_NAME_IN_ARN = "FAKE_STREAM_NAME";
|
||||||
private static String TestRegion = "us-east-1";
|
private static final String TEST_REGION = "us-east-1";
|
||||||
private static String TestRegionInArn = "us-east-2";
|
private static final String TEST_REGION_IN_ARN = "us-east-2";
|
||||||
|
|
||||||
private static String getTestStreamArn(){
|
private static String getTestStreamArn(){
|
||||||
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TestRegionInArn, TestStreamNameInArn);
|
return String.format("arn:aws:kinesis:%s:ACCOUNT_ID:stream/%s", TEST_REGION_IN_ARN, TEST_STREAM_NAME_IN_ARN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
|
|
@ -55,140 +55,146 @@ public class MultiLangDaemonConfigTest {
|
||||||
private AwsCredentialsProvider credentialsProvider;
|
private AwsCredentialsProvider credentialsProvider;
|
||||||
@Mock
|
@Mock
|
||||||
private AwsCredentials creds;
|
private AwsCredentials creds;
|
||||||
@Mock
|
|
||||||
private KinesisClientLibConfigurator configurator;
|
private KinesisClientLibConfigurator configurator;
|
||||||
|
private MultiLangDaemonConfig deamonConfig;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException thrown = ExpectedException.none();
|
public final ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
public void setup(String streamName, String streamArn) {
|
public void setup(String streamName, String streamArn, String regionName) throws IOException {
|
||||||
|
|
||||||
String PROPERTIES = String.format("executableName = %s\n"
|
String properties = String.format("executableName = %s\n"
|
||||||
+ "applicationName = %s\n"
|
+ "applicationName = %s\n"
|
||||||
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
+ "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n"
|
||||||
+ "processingLanguage = malbolge\n"
|
+ "processingLanguage = malbolge\n"
|
||||||
+ "regionName = %s\n",
|
+ "regionName = %s\n",
|
||||||
TestExe,
|
TEST_EXE,
|
||||||
TestApplicationName,
|
TEST_APPLICATION_NAME,
|
||||||
TestRegion);
|
regionName);
|
||||||
|
|
||||||
if (streamName != null) {
|
if (streamName != null) {
|
||||||
PROPERTIES += String.format("streamName = %s\n", streamName);
|
properties += String.format("streamName = %s\n", streamName);
|
||||||
}
|
}
|
||||||
if (streamArn != null) {
|
if (streamArn != null) {
|
||||||
PROPERTIES += String.format("streamArn = %s\n", streamArn);
|
properties += String.format("streamArn = %s\n", streamArn);
|
||||||
}
|
}
|
||||||
classLoader = Mockito.mock(ClassLoader.class);
|
classLoader = Mockito.mock(ClassLoader.class);
|
||||||
|
|
||||||
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader)
|
Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader)
|
||||||
.getResourceAsStream(FILENAME);
|
.getResourceAsStream(TEST_FILENAME);
|
||||||
|
|
||||||
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
when(credentialsProvider.resolveCredentials()).thenReturn(creds);
|
||||||
when(creds.accessKeyId()).thenReturn("cool-user");
|
when(creds.accessKeyId()).thenReturn("cool-user");
|
||||||
configurator = new KinesisClientLibConfigurator();
|
configurator = new KinesisClientLibConfigurator();
|
||||||
|
|
||||||
|
deamonConfig = new MultiLangDaemonConfig(TEST_FILENAME, classLoader, configurator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamArnIsInvalid() throws Exception {
|
public void testConstructorFailsBecauseStreamArnIsInvalid() throws Exception {
|
||||||
setup("", "this_is_not_a_valid_arn");
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "Malformed ARN - doesn't start with 'arn:");
|
thrown.expectMessage("Malformed ARN - doesn't start with 'arn:");
|
||||||
|
|
||||||
|
setup("", "this_is_not_a_valid_arn", TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamArnHasInvalidRegion() throws Exception {
|
public void testConstructorFailsBecauseStreamArnHasInvalidRegion() throws Exception {
|
||||||
setup("", "arn:aws:kinesis:us-east-1000:ACCOUNT_ID:stream/streamName");
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "StreamArn has unsupported region of 'us-east-1000'.");
|
thrown.expectMessage("Unsupported region: us-east-1000");
|
||||||
|
|
||||||
|
setup("", "arn:aws:kinesis:us-east-1:ACCOUNT_ID:stream/streamName", "us-east-1000");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamArnHasInvalidResourceType() throws Exception {
|
public void testConstructorFailsBecauseStreamArnHasInvalidResourceType() throws Exception {
|
||||||
setup("", "arn:aws:kinesis:us-EAST-1:ACCOUNT_ID:dynamodb/streamName");
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "StreamArn has unsupported resource type of 'dynamodb'. Expected: stream");
|
thrown.expectMessage("StreamArn has unsupported resource type of 'dynamodb'. Expected: stream");
|
||||||
|
|
||||||
|
setup("", "arn:aws:kinesis:us-EAST-1:ACCOUNT_ID:dynamodb/streamName", TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamArnHasInvalidService() throws Exception {
|
public void testConstructorFailsBecauseStreamArnHasInvalidService() throws Exception {
|
||||||
setup("", "arn:aws:kinesisFakeService:us-east-1:ACCOUNT_ID:stream/streamName");
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "StreamArn has unsupported service type of 'kinesisFakeService'. Expected: kinesis");
|
thrown.expectMessage("StreamArn has unsupported service type of 'kinesisFakeService'. Expected: kinesis");
|
||||||
|
|
||||||
|
setup("", "arn:aws:kinesisFakeService:us-east-1:ACCOUNT_ID:stream/streamName", TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws Exception {
|
public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws Exception {
|
||||||
setup("", "");
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
|
setup("", "", TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamNameAndArnAreNull() throws Exception {
|
public void testConstructorFailsBecauseStreamNameAndArnAreNull() throws Exception {
|
||||||
setup(null, null);
|
thrown.expect(NullPointerException.class);
|
||||||
assertConstructorThrowsException(NullPointerException.class, "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
|
setup(null, null, TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() throws Exception {
|
public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() throws Exception {
|
||||||
setup(null, "");
|
thrown.expect(NullPointerException.class);
|
||||||
assertConstructorThrowsException(NullPointerException.class, "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
|
setup(null, "", TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() throws Exception {
|
public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() throws Exception {
|
||||||
setup("", null);
|
thrown.expect(IllegalArgumentException.class);
|
||||||
assertConstructorThrowsException(IllegalArgumentException.class, "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
thrown.expectMessage("Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in.");
|
||||||
|
|
||||||
|
setup("", null, TEST_REGION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamName() throws IOException {
|
public void testConstructorUsingStreamName() throws IOException {
|
||||||
setup(TestStreamName, null);
|
setup(TEST_STREAM_NAME, null, TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME, TEST_REGION, null);
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
|
public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException {
|
||||||
setup(TestStreamName, "");
|
setup(TEST_STREAM_NAME, "", TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME, TEST_REGION, "");
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
|
public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException {
|
||||||
setup(TestStreamName, " ");
|
setup(TEST_STREAM_NAME, " ", TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME, TEST_REGION, "");
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamName, TestRegion, "");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamArn() throws IOException {
|
public void testConstructorUsingStreamArn() throws IOException {
|
||||||
setup(null, getTestStreamArn());
|
setup(null, getTestStreamArn(), TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME_IN_ARN, TEST_REGION, getTestStreamArn());
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
|
public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException {
|
||||||
setup("", getTestStreamArn());
|
setup("", getTestStreamArn(), TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME_IN_ARN, TEST_REGION, getTestStreamArn());
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
|
public void testConstructorUsingStreamArnOverStreamName() throws IOException {
|
||||||
setup(TestStreamName, getTestStreamArn());
|
setup(TEST_STREAM_NAME, getTestStreamArn(), TEST_REGION);
|
||||||
|
|
||||||
MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
assertConfigurationsMatch(deamonConfig, TEST_EXE, TEST_APPLICATION_NAME, TEST_STREAM_NAME_IN_ARN, TEST_REGION, getTestStreamArn());
|
||||||
|
|
||||||
assertConfigurationsMatch(deamonConfig, TestExe, TestApplicationName, TestStreamNameInArn, TestRegionInArn, getTestStreamArn());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -215,16 +221,6 @@ public class MultiLangDaemonConfigTest {
|
||||||
assertEquals(expectedStreamArn, deamonConfig.getMultiLangDaemonConfiguration().getStreamArn());
|
assertEquals(expectedStreamArn, deamonConfig.getMultiLangDaemonConfiguration().getStreamArn());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertConstructorThrowsException(Class<? extends Exception> exceptionClass, String exceptionMessage) throws Exception{
|
|
||||||
|
|
||||||
thrown.expect(exceptionClass);
|
|
||||||
if(exceptionMessage != null) {
|
|
||||||
thrown.expectMessage(exceptionMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropertyValidation() {
|
public void testPropertyValidation() {
|
||||||
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
|
String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n"
|
||||||
|
|
@ -232,11 +228,11 @@ public class MultiLangDaemonConfigTest {
|
||||||
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
|
ClassLoader classLoader = Mockito.mock(ClassLoader.class);
|
||||||
|
|
||||||
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())).when(classLoader)
|
Mockito.doReturn(new ByteArrayInputStream(PROPERTIES_NO_EXECUTABLE_NAME.getBytes())).when(classLoader)
|
||||||
.getResourceAsStream(FILENAME);
|
.getResourceAsStream(TEST_FILENAME);
|
||||||
|
|
||||||
MultiLangDaemonConfig config;
|
MultiLangDaemonConfig config;
|
||||||
try {
|
try {
|
||||||
config = new MultiLangDaemonConfig(FILENAME, classLoader, configurator);
|
config = new MultiLangDaemonConfig(TEST_FILENAME, classLoader, configurator);
|
||||||
Assert.fail("Construction of the config should have failed due to property validation failing.");
|
Assert.fail("Construction of the config should have failed due to property validation failing.");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
// Good
|
// Good
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWithBasicSetup() {
|
public void testWithBasicSetup() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = " + credentialName1, "workerId = 123" }, '\n'));
|
"applicationName = b", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "regionName = us-east-1" }, '\n'));
|
||||||
assertEquals(config.getApplicationName(), "b");
|
assertEquals(config.getApplicationName(), "b");
|
||||||
assertEquals(config.getStreamName(), "a");
|
assertEquals(config.getStreamName(), "a");
|
||||||
assertEquals(config.getWorkerIdentifier(), "123");
|
assertEquals(config.getWorkerIdentifier(), "123");
|
||||||
|
|
@ -81,7 +81,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithLongVariables() {
|
public void testWithLongVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
|
||||||
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
|
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
|
||||||
"workerId = 123", "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'));
|
"workerId = 123", "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getApplicationName(), "app");
|
assertEquals(config.getApplicationName(), "app");
|
||||||
assertEquals(config.getStreamName(), "123");
|
assertEquals(config.getStreamName(), "123");
|
||||||
|
|
@ -95,7 +95,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
long epochTimeInSeconds = 1617406032;
|
long epochTimeInSeconds = 1617406032;
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
|
||||||
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
|
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
|
||||||
"initialPositionInStreamExtended = " + epochTimeInSeconds}, '\n'));
|
"initialPositionInStreamExtended = " + epochTimeInSeconds, "regionName = us-east-1"}, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(epochTimeInSeconds * 1000L));
|
assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(epochTimeInSeconds * 1000L));
|
||||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
|
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
|
||||||
|
|
@ -135,7 +135,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithUnsupportedClientConfigurationVariables() {
|
public void testWithUnsupportedClientConfigurationVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
|
||||||
new String[] { "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, "workerId = id",
|
new String[] { "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, "workerId = id",
|
||||||
"kinesisClientConfig = {}", "streamName = stream", "applicationName = b" },
|
"kinesisClientConfig = {}", "streamName = stream", "applicationName = b", "regionName = us-east-1" },
|
||||||
'\n'));
|
'\n'));
|
||||||
|
|
||||||
assertEquals(config.getApplicationName(), "b");
|
assertEquals(config.getApplicationName(), "b");
|
||||||
|
|
@ -149,7 +149,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = kinesis",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = kinesis",
|
||||||
"AWSCredentialsProvider = " + credentialName2 + ", " + credentialName1, "workerId = w123",
|
"AWSCredentialsProvider = " + credentialName2 + ", " + credentialName1, "workerId = w123",
|
||||||
"maxRecords = 10", "metricsMaxQueueSize = 20", "applicationName = kinesis",
|
"maxRecords = 10", "metricsMaxQueueSize = 20", "applicationName = kinesis",
|
||||||
"retryGetRecordsInSeconds = 2", "maxGetRecordsThreadPool = 1" }, '\n'));
|
"retryGetRecordsInSeconds = 2", "maxGetRecordsThreadPool = 1", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getApplicationName(), "kinesis");
|
assertEquals(config.getApplicationName(), "kinesis");
|
||||||
assertEquals(config.getStreamName(), "kinesis");
|
assertEquals(config.getStreamName(), "kinesis");
|
||||||
|
|
@ -164,7 +164,8 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithBooleanVariables() {
|
public void testWithBooleanVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD, " + credentialName1, "workerId = 0",
|
"applicationName = b", "AWSCredentialsProvider = ABCD, " + credentialName1, "workerId = 0",
|
||||||
"cleanupLeasesUponShardCompletion = false", "validateSequenceNumberBeforeCheckpointing = true" },
|
"cleanupLeasesUponShardCompletion = false", "validateSequenceNumberBeforeCheckpointing = true",
|
||||||
|
"regionName = us-east-1"},
|
||||||
'\n'));
|
'\n'));
|
||||||
|
|
||||||
assertEquals(config.getApplicationName(), "b");
|
assertEquals(config.getApplicationName(), "b");
|
||||||
|
|
@ -178,7 +179,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithStringVariables() {
|
public void testWithStringVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 1",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 1",
|
||||||
"kinesisEndpoint = https://kinesis", "metricsLevel = SUMMARY" }, '\n'));
|
"kinesisEndpoint = https://kinesis", "metricsLevel = SUMMARY", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getWorkerIdentifier(), "1");
|
assertEquals(config.getWorkerIdentifier(), "1");
|
||||||
assertEquals(config.getKinesisClient().get("endpointOverride"), URI.create("https://kinesis"));
|
assertEquals(config.getKinesisClient().get("endpointOverride"), URI.create("https://kinesis"));
|
||||||
|
|
@ -189,7 +190,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithSetVariables() {
|
public void testWithSetVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 1",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 1",
|
||||||
"metricsEnabledDimensions = ShardId, WorkerIdentifier" }, '\n'));
|
"metricsEnabledDimensions = ShardId, WorkerIdentifier", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
Set<String> expectedMetricsEnabledDimensions = ImmutableSet.<String> builder()
|
Set<String> expectedMetricsEnabledDimensions = ImmutableSet.<String> builder()
|
||||||
.add("ShardId", "WorkerIdentifier").build();
|
.add("ShardId", "WorkerIdentifier").build();
|
||||||
|
|
@ -200,7 +201,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithInitialPositionInStreamTrimHorizon() {
|
public void testWithInitialPositionInStreamTrimHorizon() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||||
"initialPositionInStream = TriM_Horizon" }, '\n'));
|
"initialPositionInStream = TriM_Horizon", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
|
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
|
||||||
}
|
}
|
||||||
|
|
@ -209,7 +210,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithInitialPositionInStreamLatest() {
|
public void testWithInitialPositionInStreamLatest() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||||
"initialPositionInStream = LateSt" }, '\n'));
|
"initialPositionInStream = LateSt", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST);
|
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST);
|
||||||
}
|
}
|
||||||
|
|
@ -218,7 +219,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testSkippingNonKCLVariables() {
|
public void testSkippingNonKCLVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||||
"initialPositionInStream = TriM_Horizon", "abc = 1" }, '\n'));
|
"initialPositionInStream = TriM_Horizon", "abc = 1", "regionName = us-east-1" }, '\n'));
|
||||||
|
|
||||||
assertEquals(config.getApplicationName(), "b");
|
assertEquals(config.getApplicationName(), "b");
|
||||||
assertEquals(config.getStreamName(), "a");
|
assertEquals(config.getStreamName(), "a");
|
||||||
|
|
@ -230,7 +231,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testEmptyOptionalVariables() {
|
public void testEmptyOptionalVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",
|
||||||
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
"applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||||
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 1" }, '\n'));
|
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 1", "regionName = us-east-1" }, '\n'));
|
||||||
assertThat(config.getMaxGetRecordsThreadPool(), equalTo(1));
|
assertThat(config.getMaxGetRecordsThreadPool(), equalTo(1));
|
||||||
assertThat(config.getRetryGetRecordsInSeconds(), nullValue());
|
assertThat(config.getRetryGetRecordsInSeconds(), nullValue());
|
||||||
}
|
}
|
||||||
|
|
@ -240,7 +241,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
||||||
"AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
"AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123",
|
||||||
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 0",
|
"initialPositionInStream = TriM_Horizon", "maxGetRecordsThreadPool = 0",
|
||||||
"retryGetRecordsInSeconds = 0" }, '\n');
|
"retryGetRecordsInSeconds = 0", "regionName = us-east-1" }, '\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -254,7 +255,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWithInvalidIntValue() {
|
public void testWithInvalidIntValue() {
|
||||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
||||||
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100nf" }, '\n');
|
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100nf", "regionName = us-east-1" }, '\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -267,7 +268,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testWithNegativeIntValue() {
|
public void testWithNegativeIntValue() {
|
||||||
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b",
|
||||||
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = -12" }, '\n');
|
"AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = -12", "regionName = us-east-1" }, '\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||||
|
|
@ -295,7 +296,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
public void testWithMissingWorkerId() {
|
public void testWithMissingWorkerId() {
|
||||||
String test = StringUtils.join(
|
String test = StringUtils.join(
|
||||||
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1,
|
new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = " + credentialName1,
|
||||||
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" },
|
"failoverTimeMillis = 100", "shardSyncIntervalMillis = 500", "regionName = us-east-1" },
|
||||||
'\n');
|
'\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
MultiLangDaemonConfiguration config = configurator.getConfiguration(input);
|
||||||
|
|
@ -374,7 +375,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
"AWSCredentialsProvider = " + credentialNameKinesis,
|
"AWSCredentialsProvider = " + credentialNameKinesis,
|
||||||
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
|
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
|
||||||
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, "failoverTimeMillis = 100",
|
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, "failoverTimeMillis = 100",
|
||||||
"shardSyncIntervalMillis = 500" }, '\n');
|
"shardSyncIntervalMillis = 500", "regionName = us-east-1" }, '\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||||
|
|
@ -403,7 +404,7 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
"AWSCredentialsProvider = " + credentialNameKinesis,
|
"AWSCredentialsProvider = " + credentialNameKinesis,
|
||||||
"AWSCredentialsProviderDynamoDB = " + credentialName2,
|
"AWSCredentialsProviderDynamoDB = " + credentialName2,
|
||||||
"AWSCredentialsProviderCloudWatch = " + credentialName2, "failoverTimeMillis = 100",
|
"AWSCredentialsProviderCloudWatch = " + credentialName2, "failoverTimeMillis = 100",
|
||||||
"shardSyncIntervalMillis = 500" }, '\n');
|
"shardSyncIntervalMillis = 500", "regionName = us-east-1" }, '\n');
|
||||||
InputStream input = new ByteArrayInputStream(test.getBytes());
|
InputStream input = new ByteArrayInputStream(test.getBytes());
|
||||||
|
|
||||||
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue