diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java index f3facdc0..c95d0853 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfigurator.java @@ -23,8 +23,10 @@ import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.StreamIdentifier; /** * KinesisClientLibConfigurator constructs a KinesisClientLibConfiguration from java properties file. The following @@ -55,7 +57,7 @@ public class KinesisClientLibConfigurator { * Program will fail immediately, if customer provide: 1) invalid variable value. Program will log it as warning and * continue, if customer provide: 1) variable with unsupported variable type. 2) a variable with name which does not * match any of the variables in KinesisClientLibConfigration. - * + * * @param properties a Properties object containing the configuration information * @return KinesisClientLibConfiguration */ @@ -69,8 +71,19 @@ public class KinesisClientLibConfigurator { }); Validate.notBlank(configuration.getApplicationName(), "Application name is required"); - Validate.notBlank(configuration.getStreamName(), "Stream name is required"); + + if (configuration.getStreamArn() != null && !configuration.getStreamArn().trim().isEmpty()) { + final Arn streamArnObj = Arn.fromString(configuration.getStreamArn()); + StreamIdentifier.validateArn(streamArnObj); + //Parse out the stream Name from the Arn (and/or override existing value for Stream Name) + final String streamNameFromArn = streamArnObj.resource().resource(); + configuration.setStreamName(streamNameFromArn); + + } + + Validate.notBlank(configuration.getStreamName(), "Stream name or Stream Arn is required. Stream Arn takes precedence if both are passed in."); Validate.isTrue(configuration.getKinesisCredentialsProvider().isDirty(), "A basic set of AWS credentials must be provided"); + return configuration; } @@ -97,4 +110,4 @@ public class KinesisClientLibConfigurator { } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index da280ddf..7a7f2e79 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -28,7 +28,6 @@ import java.util.UUID; import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -73,6 +72,8 @@ public class MultiLangDaemonConfiguration { private String applicationName; private String streamName; + private String streamArn; + @ConfigurationSettable(configurationClass = ConfigsBuilder.class) private String tableName; @@ -403,4 +404,4 @@ public class MultiLangDaemonConfiguration { return resolvedConfiguration(shardRecordProcessorFactory).build(); } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java index b6541227..14ac357c 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageReaderTest.java @@ -28,7 +28,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import software.amazon.kinesis.multilang.MessageReader; import software.amazon.kinesis.multilang.messages.Message; import software.amazon.kinesis.multilang.messages.StatusMessage; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java index 6a0c06b4..eaf6be7b 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MessageWriterTest.java @@ -32,15 +32,12 @@ import org.mockito.Mockito; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; -import software.amazon.kinesis.multilang.MessageWriter; -import software.amazon.kinesis.multilang.messages.LeaseLostMessage; import software.amazon.kinesis.multilang.messages.Message; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.retrieval.KinesisClientRecord; import static org.mockito.Mockito.verify; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java index b86a64ad..c6be1157 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonConfigTest.java @@ -14,17 +14,14 @@ */ package software.amazon.kinesis.multilang; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Properties; -import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.ConvertUtilsBean; -import org.junit.Before; +import software.amazon.awssdk.regions.Region; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -39,48 +36,154 @@ import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; @RunWith(MockitoJUnitRunner.class) public class MultiLangDaemonConfigTest { - private static String FILENAME = "some.properties"; + private static final String FILENAME = "some.properties"; + private static final String EXE = "TestExe.exe"; + private static final String APPLICATION_NAME = MultiLangDaemonConfigTest.class.getSimpleName(); + private static final String STREAM_NAME = "fakeStream"; + private static final String STREAM_NAME_IN_ARN = "FAKE_STREAM_NAME"; + private static final Region REGION = Region.US_EAST_1; + private static final String STREAM_ARN = "arn:aws:kinesis:us-east-2:012345678987:stream/" + STREAM_NAME_IN_ARN; + + @Mock + private ClassLoader classLoader; @Mock private AwsCredentialsProvider credentialsProvider; @Mock private AwsCredentials creds; - @Mock + private KinesisClientLibConfigurator configurator; + private MultiLangDaemonConfig deamonConfig; - @Before - public void setup() { - ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean(); - BeanUtilsBean utilsBean = new BeanUtilsBean(convertUtilsBean); - MultiLangDaemonConfiguration multiLangDaemonConfiguration = new MultiLangDaemonConfiguration(utilsBean, - convertUtilsBean); - multiLangDaemonConfiguration.setApplicationName("cool-app"); - multiLangDaemonConfiguration.setStreamName("cool-stream"); - multiLangDaemonConfiguration.setWorkerIdentifier("cool-worker"); - when(credentialsProvider.resolveCredentials()).thenReturn(creds); - when(creds.accessKeyId()).thenReturn("cool-user"); - when(configurator.getConfiguration(any(Properties.class))).thenReturn(multiLangDaemonConfiguration); - } + /** + * Instantiate a MultiLangDaemonConfig object + * @param streamName + * @param streamArn + * @throws IOException + */ + public void setup(String streamName, String streamArn) throws IOException { - @Test - public void constructorTest() throws IOException { - String PROPERTIES = "executableName = randomEXE \n" + "applicationName = testApp \n" - + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" - + "processingLanguage = malbolge"; - ClassLoader classLoader = Mockito.mock(ClassLoader.class); + String properties = String.format("executableName = %s\n" + + "applicationName = %s\n" + + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + + "processingLanguage = malbolge\n" + + "regionName = %s\n", + EXE, + APPLICATION_NAME, + "us-east-1"); - Mockito.doReturn(new ByteArrayInputStream(PROPERTIES.getBytes())).when(classLoader) + if (streamName != null) { + properties += String.format("streamName = %s\n", streamName); + } + if (streamArn != null) { + properties += String.format("streamArn = %s\n", streamArn); + } + classLoader = Mockito.mock(ClassLoader.class); + + Mockito.doReturn(new ByteArrayInputStream(properties.getBytes())).when(classLoader) .getResourceAsStream(FILENAME); - MultiLangDaemonConfig deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + when(credentialsProvider.resolveCredentials()).thenReturn(creds); + when(creds.accessKeyId()).thenReturn("cool-user"); + configurator = new KinesisClientLibConfigurator(); - assertNotNull(deamonConfig.getExecutorService()); - assertNotNull(deamonConfig.getMultiLangDaemonConfiguration()); - assertNotNull(deamonConfig.getRecordProcessorFactory()); + deamonConfig = new MultiLangDaemonConfig(FILENAME, classLoader, configurator); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorFailsBecauseStreamArnIsInvalid() throws Exception { + setup("", "this_is_not_a_valid_arn"); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorFailsBecauseStreamArnIsInvalid2() throws Exception { + setup("", "arn:aws:kinesis:us-east-2:ACCOUNT_ID:BadFormatting:stream/" + STREAM_NAME_IN_ARN); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorFailsBecauseStreamNameAndArnAreEmpty() throws Exception { + setup("", ""); + } + + @Test(expected = NullPointerException.class) + public void testConstructorFailsBecauseStreamNameAndArnAreNull() throws Exception { + setup(null, null); + } + + @Test(expected = NullPointerException.class) + public void testConstructorFailsBecauseStreamNameIsNullAndArnIsEmpty() throws Exception { + setup(null, ""); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorFailsBecauseStreamNameIsEmptyAndArnIsNull() throws Exception { + setup("", null); } @Test - public void propertyValidation() { + public void testConstructorUsingStreamName() throws IOException { + setup(STREAM_NAME, null); + + assertConfigurationsMatch(STREAM_NAME, null); + } + + @Test + public void testConstructorUsingStreamNameAndStreamArnIsEmpty() throws IOException { + setup(STREAM_NAME, ""); + + assertConfigurationsMatch(STREAM_NAME, ""); + } + + @Test + public void testConstructorUsingStreamNameAndStreamArnIsWhitespace() throws IOException { + setup(STREAM_NAME, " "); + + assertConfigurationsMatch(STREAM_NAME, ""); + } + + @Test + public void testConstructorUsingStreamArn() throws IOException { + setup(null, STREAM_ARN); + + assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN); + } + + @Test + public void testConstructorUsingStreamNameAsEmptyAndStreamArn() throws IOException { + setup("", STREAM_ARN); + + assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN); + } + + @Test + public void testConstructorUsingStreamArnOverStreamName() throws IOException { + setup(STREAM_NAME, STREAM_ARN); + + assertConfigurationsMatch(STREAM_NAME_IN_ARN, STREAM_ARN); + } + + /** + * Verify the daemonConfig properties are what we expect them to be. + * @param deamonConfig + * @param expectedStreamName + */ + private void assertConfigurationsMatch(String expectedStreamName, String expectedStreamArn) { + final MultiLangDaemonConfiguration multiLangConfiguration = deamonConfig.getMultiLangDaemonConfiguration(); + assertNotNull(deamonConfig.getExecutorService()); + assertNotNull(multiLangConfiguration); + assertNotNull(deamonConfig.getRecordProcessorFactory()); + + assertEquals(EXE, deamonConfig.getRecordProcessorFactory().getCommandArray()[0]); + assertEquals(APPLICATION_NAME, multiLangConfiguration.getApplicationName()); + assertEquals(expectedStreamName, multiLangConfiguration.getStreamName()); + assertEquals(REGION, multiLangConfiguration.getDynamoDbClient().get("region")); + assertEquals(REGION, multiLangConfiguration.getCloudWatchClient().get("region")); + assertEquals(REGION, multiLangConfiguration.getKinesisClient().get("region")); + assertEquals(expectedStreamArn, multiLangConfiguration.getStreamArn()); + } + + @Test + public void testPropertyValidation() { String PROPERTIES_NO_EXECUTABLE_NAME = "applicationName = testApp \n" + "streamName = fakeStream \n" + "AWSCredentialsProvider = DefaultAWSCredentialsProviderChain\n" + "processingLanguage = malbolge"; ClassLoader classLoader = Mockito.mock(ClassLoader.class); @@ -99,4 +202,4 @@ public class MultiLangDaemonConfigTest { } } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java index 0c1d0b60..3229e2b8 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangDaemonTest.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.multilang; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyObject; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangProtocolTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangProtocolTest.java index dc6166aa..d385b2f9 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangProtocolTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/MultiLangProtocolTest.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -61,10 +60,8 @@ import software.amazon.kinesis.multilang.messages.ShardEndedMessage; import software.amazon.kinesis.multilang.messages.StatusMessage; import com.google.common.util.concurrent.SettableFuture; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; -import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.retrieval.KinesisClientRecord; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java index b3bb0719..bffd431d 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/ReadSTDERRTaskTest.java @@ -27,8 +27,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import software.amazon.kinesis.multilang.DrainChildSTDERRTask; -import software.amazon.kinesis.multilang.LineReaderTask; public class ReadSTDERRTaskTest { diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorFactoryTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorFactoryTest.java index 7a7d7b11..1954cf91 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorFactoryTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/StreamingShardRecordProcessorFactoryTest.java @@ -14,12 +14,9 @@ */ package software.amazon.kinesis.multilang; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import org.junit.Assert; import org.junit.Test; -import software.amazon.kinesis.multilang.MultiLangRecordProcessorFactory; -import software.amazon.kinesis.multilang.MultiLangShardRecordProcessor; import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; import software.amazon.kinesis.processor.ShardRecordProcessor; import org.junit.runner.RunWith; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java index 8da22d53..36f496d3 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.multilang.config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import java.util.Arrays; @@ -32,11 +31,6 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProviderChain; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; - public class AWSCredentialsProviderPropertyValueDecoderTest { private static final String TEST_ACCESS_KEY_ID = "123"; diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 031fc427..1f05240a 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -278,10 +278,8 @@ public class KinesisClientLibConfiguratorTest { } } - @Test + @Test(expected = IllegalArgumentException.class) public void testWithMissingCredentialsProvider() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("A basic set of AWS credentials must be provided"); String test = StringUtils.join(new String[] { "streamName = a", "applicationName = b", "workerId = 123", "failoverTimeMillis = 100", "shardSyncIntervalMillis = 500" }, '\n'); @@ -305,22 +303,37 @@ public class KinesisClientLibConfiguratorTest { assertFalse(config.getWorkerIdentifier().isEmpty()); } - @Test - public void testWithMissingStreamName() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("Stream name is required"); - - String test = StringUtils.join(new String[] { "applicationName = b", - "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); + @Test(expected = NullPointerException.class) + public void testWithMissingStreamNameAndMissingStreamArn() { + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100" }, + '\n'); InputStream input = new ByteArrayInputStream(test.getBytes()); configurator.getConfiguration(input); } - @Test + @Test(expected = IllegalArgumentException.class) + public void testWithEmptyStreamNameAndMissingStreamArn() { + + String test = StringUtils.join(new String[] { + "applicationName = b", + "AWSCredentialsProvider = " + credentialName1, + "workerId = 123", + "failoverTimeMillis = 100", + "streamName = ", + "streamArn = "}, + '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + configurator.getConfiguration(input); + } + + @Test(expected = NullPointerException.class) public void testWithMissingApplicationName() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("Application name is required"); String test = StringUtils.join(new String[] { "streamName = a", "AWSCredentialsProvider = " + credentialName1, "workerId = 123", "failoverTimeMillis = 100" }, '\n'); @@ -493,4 +506,4 @@ public class KinesisClientLibConfiguratorTest { MultiLangDaemonConfiguration config = configurator.getConfiguration(input); return config; } -} +} \ No newline at end of file diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/messages/MessageTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/messages/MessageTest.java index 47337221..86798080 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/messages/MessageTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/messages/MessageTest.java @@ -26,13 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.ShutdownReason; -import software.amazon.kinesis.multilang.messages.CheckpointMessage; -import software.amazon.kinesis.multilang.messages.InitializeMessage; -import software.amazon.kinesis.multilang.messages.Message; -import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage; -import software.amazon.kinesis.multilang.messages.ShutdownMessage; -import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage; -import software.amazon.kinesis.multilang.messages.StatusMessage; import software.amazon.kinesis.retrieval.KinesisClientRecord; public class MessageTest { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 8307ed82..82cef04b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -167,12 +167,22 @@ public class StreamIdentifier { .build(); } - private static void validateArn(Arn streamArn) { + /** + * Verify the streamArn follows the appropriate formatting. + * Throw an exception if it does not. + * @param streamArn + */ + public static void validateArn(Arn streamArn) { if (!STREAM_ARN_PATTERN.matcher(streamArn.toString()).matches() || !streamArn.region().isPresent()) { - throw new IllegalArgumentException("Unable to create a StreamIdentifier from " + streamArn); + throw new IllegalArgumentException("Invalid streamArn " + streamArn); } } + /** + * Verify creationEpoch is greater than 0. + * Throw an exception if it is not. + * @param creationEpoch + */ private static void validateCreationEpoch(long creationEpoch) { if (creationEpoch <= 0) { throw new IllegalArgumentException(