Allow InitialPositionInStreamExtended to be specified in properties file
E.g. initialPositionInStreamExtended = 1617305352
This commit is contained in:
parent
668422ccbd
commit
6e0cbb905d
2 changed files with 21 additions and 0 deletions
|
|
@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
@ -27,6 +28,7 @@ import java.util.UUID;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.commons.beanutils.BeanUtilsBean;
|
import org.apache.commons.beanutils.BeanUtilsBean;
|
||||||
|
import org.apache.commons.beanutils.ConvertUtils;
|
||||||
import org.apache.commons.beanutils.ConvertUtilsBean;
|
import org.apache.commons.beanutils.ConvertUtilsBean;
|
||||||
import org.apache.commons.beanutils.Converter;
|
import org.apache.commons.beanutils.Converter;
|
||||||
import org.apache.commons.beanutils.converters.ArrayConverter;
|
import org.apache.commons.beanutils.converters.ArrayConverter;
|
||||||
|
|
@ -197,6 +199,14 @@ public class MultiLangDaemonConfiguration {
|
||||||
this.utilsBean = utilsBean;
|
this.utilsBean = utilsBean;
|
||||||
this.convertUtilsBean = convertUtilsBean;
|
this.convertUtilsBean = convertUtilsBean;
|
||||||
|
|
||||||
|
convertUtilsBean.register(new Converter() {
|
||||||
|
@Override
|
||||||
|
public <T> T convert(Class<T> type, Object value) {
|
||||||
|
Date date = new Date(Long.parseLong(value.toString()) * 1000L);
|
||||||
|
return type.cast(InitialPositionInStreamExtended.newInitialPositionAtTimestamp(date));
|
||||||
|
}
|
||||||
|
}, InitialPositionInStreamExtended.class);
|
||||||
|
|
||||||
convertUtilsBean.register(new Converter() {
|
convertUtilsBean.register(new Converter() {
|
||||||
@Override
|
@Override
|
||||||
public <T> T convert(Class<T> type, Object value) {
|
public <T> T convert(Class<T> type, Object value) {
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
@ -93,6 +94,16 @@ public class KinesisClientLibConfiguratorTest {
|
||||||
assertEquals(config.getShardSyncIntervalMillis(), 500);
|
assertEquals(config.getShardSyncIntervalMillis(), 500);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithInitialPositionInStreamExtended() {
|
||||||
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(new String[] { "applicationName = app",
|
||||||
|
"streamName = 123", "AWSCredentialsProvider = " + credentialName1 + ", " + credentialName2,
|
||||||
|
"initialPositionInStreamExtended = 1617406032" }, '\n'));
|
||||||
|
|
||||||
|
assertEquals(config.getInitialPositionInStreamExtended().getTimestamp(), new Date(1617406032000L));
|
||||||
|
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithUnsupportedClientConfigurationVariables() {
|
public void testWithUnsupportedClientConfigurationVariables() {
|
||||||
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
|
MultiLangDaemonConfiguration config = getConfiguration(StringUtils.join(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue