Enables property for setting AT_TIMESTAMP shard iterator initial time… (#342)
Allows setting the timestamp for InitialPositiinInStream.AT_TIMESTAMP from a properties file.
This commit is contained in:
parent
be60a5507d
commit
a84885db79
4 changed files with 165 additions and 0 deletions
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Provide Date property.
|
||||
*/
|
||||
class DatePropertyValueDecoder implements IPropertyValueDecoder<Date> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
DatePropertyValueDecoder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param value property value as String
|
||||
* @return corresponding variable in correct type
|
||||
*/
|
||||
@Override
|
||||
public Date decodeValue(String value) {
|
||||
try {
|
||||
return new Date(Long.parseLong(value) * 1000L);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Date property value must be numeric.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of supported types
|
||||
*/
|
||||
@Override
|
||||
public List<Class<Date>> getSupportedTypes() {
|
||||
return Arrays.asList(Date.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -66,6 +66,7 @@ public class KinesisClientLibConfigurator {
|
|||
Arrays.asList(new IntegerPropertyValueDecoder(),
|
||||
new LongPropertyValueDecoder(),
|
||||
new BooleanPropertyValueDecoder(),
|
||||
new DatePropertyValueDecoder(),
|
||||
new AWSCredentialsProviderPropertyValueDecoder(),
|
||||
new StringPropertyValueDecoder(),
|
||||
new InitialPositionInStreamPropertyValueDecoder(),
|
||||
|
|
|
|||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Amazon Software License (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/asl/
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.config.DatePropertyValueDecoder;
|
||||
|
||||
public class DatePropertyValueDecoderTest {
|
||||
|
||||
private DatePropertyValueDecoder decoder = new DatePropertyValueDecoder();
|
||||
|
||||
private static final String TEST_VALUE = "1527267472";
|
||||
|
||||
@Test
|
||||
public void testNumericValue() {
|
||||
Date timestamp = decoder.decodeValue(TEST_VALUE);
|
||||
assertEquals(timestamp.getClass(), Date.class);
|
||||
assertEquals(timestamp, new Date(Long.parseLong(TEST_VALUE) * 1000L));
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testEmptyValue() {
|
||||
Date timestamp = decoder.decodeValue("");
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testNullValue() {
|
||||
Date timestamp = decoder.decodeValue(null);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testNonNumericValue() {
|
||||
Date timestamp = decoder.decodeValue("123abc");
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
|
@ -143,6 +144,20 @@ public class KinesisClientLibConfiguratorTest {
|
|||
assertTrue(config.shouldValidateSequenceNumberBeforeCheckpointing());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithDateVariables() {
|
||||
KinesisClientLibConfiguration config =
|
||||
getConfiguration(StringUtils.join(new String[] {
|
||||
"streamName = a",
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = ABCD, " + credentialName1,
|
||||
"timestampAtInitialPositionInStream = 1527267472"
|
||||
}, '\n'));
|
||||
|
||||
assertEquals(config.getTimestampAtInitialPositionInStream(),
|
||||
new Date(1527267472 * 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithStringVariables() {
|
||||
KinesisClientLibConfiguration config =
|
||||
|
|
@ -189,6 +204,49 @@ public class KinesisClientLibConfiguratorTest {
|
|||
}, '\n'));
|
||||
|
||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithTimestampAtInitialPositionInStreamVariables() {
|
||||
KinesisClientLibConfiguration config =
|
||||
getConfiguration(StringUtils.join(new String[] {
|
||||
"streamName = a",
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = ABCD," + credentialName1,
|
||||
"timestampAtInitialPositionInStream = 1527267472"
|
||||
}, '\n'));
|
||||
|
||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.AT_TIMESTAMP);
|
||||
assertEquals(config.getTimestampAtInitialPositionInStream(),
|
||||
new Date(1527267472 * 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithEmptyTimestampAtInitialPositionInStreamVariables() {
|
||||
KinesisClientLibConfiguration config =
|
||||
getConfiguration(StringUtils.join(new String[] {
|
||||
"streamName = a",
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = ABCD," + credentialName1,
|
||||
"timestampAtInitialPositionInStream = "
|
||||
}, '\n'));
|
||||
|
||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST);
|
||||
assertEquals(config.getTimestampAtInitialPositionInStream(), null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithNonNumericTimestampAtInitialPositionInStreamVariables() {
|
||||
KinesisClientLibConfiguration config =
|
||||
getConfiguration(StringUtils.join(new String[] {
|
||||
"streamName = a",
|
||||
"applicationName = b",
|
||||
"AWSCredentialsProvider = ABCD," + credentialName1,
|
||||
"timestampAtInitialPositionInStream = 123abc"
|
||||
}, '\n'));
|
||||
|
||||
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.LATEST);
|
||||
assertEquals(config.getTimestampAtInitialPositionInStream(), null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue