From 6e0cbb905d73f7469896885b5eef11c7519e0587 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 2 Apr 2021 16:43:03 -0700 Subject: [PATCH 1/3] Allow InitialPositionInStreamExtended to be specified in properties file E.g. initialPositionInStreamExtended = 1617305352 --- .../config/MultiLangDaemonConfiguration.java | 10 ++++++++++ .../config/KinesisClientLibConfiguratorTest.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index ed48ac27..da280ddf 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.UUID; import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -197,6 +199,14 @@ public class MultiLangDaemonConfiguration { this.utilsBean = utilsBean; this.convertUtilsBean = convertUtilsBean; + convertUtilsBean.register(new Converter() { + @Override + public T convert(Class type, Object value) { + Date date = new Date(Long.parseLong(value.toString()) * 1000L); + return type.cast(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(date)); + } + }, InitialPositionInStreamExtended.class); + convertUtilsBean.register(new Converter() { @Override public T convert(Class type, Object value) { diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 685d83dc..e2e9d424 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.net.URI; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -93,6 +94,16 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getShardSyncIntervalMillis(), 500); } + @Test + public void testWithInitialPositionInStreamExtended() { + MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = 1617406032" }, '\n')); + + assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(1617406032000L)); + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); + } + @Test public void testWithUnsupportedClientConfigurationVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( From 362e086d5de9f313fd58adfeee2c7010fc684722 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 10 May 2021 10:51:13 -0700 Subject: [PATCH 2/3] Create shared timestamp for testWithInitialPositionInStreamExtended --- .../multilang/config/KinesisClientLibConfiguratorTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index e2e9d424..28616841 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -96,11 +96,12 @@ public class KinesisClientLibConfiguratorTest { @Test public void testWithInitialPositionInStreamExtended() { + long epochTimeInSeconds = 1617406032; MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app", "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, - "initialPositionInStreamExtended = 1617406032" }, '\n')); + "initialPositionInStreamExtended = " + epochTimeInSeconds}, '\n')); - assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(1617406032000L)); + assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(epochTimeInSeconds * 1000L)); assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); } From afee84d7da1b5467b1d55f8b675e31b1a412c881 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 10 May 2021 13:48:37 -0700 Subject: [PATCH 3/3] Add more unit tests for exception cases --- .../KinesisClientLibConfiguratorTest.java | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 28616841..031fc427 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -29,14 +29,13 @@ import java.net.URI; import java.util.Arrays; import java.util.Date; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import org.apache.commons.lang3.StringUtils; -import org.junit.Ignore; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Rule; import org.junit.Test; @@ -46,11 +45,8 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.coordinator.KinesisClientLibConfiguration; import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; @@ -105,6 +101,36 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); } + @Test + public void testInvalidInitialPositionInStream() { + // AT_TIMESTAMP cannot be used as initialPositionInStream. If a user wants to specify AT_TIMESTAMP, + // they must specify the time with initialPositionInStreamExtended. + try { + getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStream = AT_TIMESTAMP"}, '\n')); + fail("Should have thrown when initialPositionInStream is set to AT_TIMESTAMP"); + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + } + } + + @Test + public void testInvalidInitialPositionInStreamExtended() { + // initialPositionInStreamExtended takes a long value indicating seconds since epoch. If a non-long + // value is provided, the constructor should throw an IllegalArgumentException exception. + try { + getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = null"}, '\n')); + fail("Should have thrown when initialPositionInStreamExtended is set to null"); + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + assertTrue(rootCause instanceof IllegalArgumentException); + } + } + @Test public void testWithUnsupportedClientConfigurationVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join( @@ -171,7 +197,7 @@ public class KinesisClientLibConfiguratorTest { } @Test - public void testWithInitialPositionInStreamVariables() { + public void testWithInitialPositionInStreamTrimHorizon() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a", "applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123", "initialPositionInStream = TriM_Horizon" }, '\n')); @@ -179,6 +205,15 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); } + @Test + public void testWithInitialPositionInStreamLatest() { + MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a", + "applicationName = b", "AWSCredentialsProvider = ABCD," + credentialName1, "workerId = 123", + "initialPositionInStream = LateSt" }, '\n')); + + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST); + } + @Test public void testSkippingNonKCLVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "streamName = a",