From 6e0cbb905d73f7469896885b5eef11c7519e0587 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 2 Apr 2021 16:43:03 -0700 Subject: [PATCH] Allow InitialPositionInStreamExtended to be specified in properties file E.g. initialPositionInStreamExtended = 1617305352 --- .../config/MultiLangDaemonConfiguration.java | 10 ++++++++++ .../config/KinesisClientLibConfiguratorTest.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java index ed48ac27..da280ddf 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfiguration.java @@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.UUID; import java.util.function.Function; import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.converters.ArrayConverter; @@ -197,6 +199,14 @@ public class MultiLangDaemonConfiguration { this.utilsBean = utilsBean; this.convertUtilsBean = convertUtilsBean; + convertUtilsBean.register(new Converter() { + @Override + public T convert(Class type, Object value) { + Date date = new Date(Long.parseLong(value.toString()) * 1000L); + return type.cast(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(date)); + } + }, InitialPositionInStreamExtended.class); + convertUtilsBean.register(new Converter() { @Override public T convert(Class type, Object value) { diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java index 685d83dc..e2e9d424 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/KinesisClientLibConfiguratorTest.java @@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.net.URI; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -93,6 +94,16 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getShardSyncIntervalMillis(), 500); } + @Test + public void testWithInitialPositionInStreamExtended() { + MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app", + "streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2, + "initialPositionInStreamExtended = 1617406032" }, '\n')); + + assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(1617406032000L)); + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP); + } + @Test public void testWithUnsupportedClientConfigurationVariables() { MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(