add configuration for get timeout information

This commit is contained in:
Wei 2017-08-29 13:42:21 -07:00
parent e2aa89d8f6
commit 20ba891cb7
2 changed files with 71 additions and 1 deletions

View file

@ -217,6 +217,12 @@ public class KinesisClientLibConfiguration {
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@Getter
private Optional<Integer> retryGetRecordsInSeconds = Optional.empty();
@Getter
private Optional<Integer> maxGetRecordsThreadPool = Optional.empty();
@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
@ -1111,6 +1117,27 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
* @param retryGetRecordsInSeconds the time in secods to wait before the worker retries to get a record.
* @return this configuration object.
*/
public KinesisClientLibConfiguration withRetryGetRecordsInSeconds(final int retryGetRecordsInSeconds) {
checkIsValuePositive("retryGetRecordsInSeconds", retryGetRecordsInSeconds);
this.retryGetRecordsInSeconds = Optional.of(retryGetRecordsInSeconds);
return this;
}
/**
*@param maxGetRecordsThreadPool the max number of threads in the getRecords thread pool.
*@return this configuration object
*/
public KinesisClientLibConfiguration withMaxGetRecordsThreadPool(final int maxGetRecordsThreadPool) {
checkIsValuePositive("maxGetRecordsThreadPool", maxGetRecordsThreadPool);
this.maxGetRecordsThreadPool = Optional.of(maxGetRecordsThreadPool);
return this;
}
/**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/

View file

@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
@ -60,6 +61,8 @@ public class KinesisClientLibConfiguratorTest {
assertEquals(config.getApplicationName(), "b");
assertEquals(config.getStreamName(), "a");
assertEquals(config.getWorkerIdentifier(), "123");
assertEquals(config.getMaxGetRecordsThreadPool(), Optional.empty());
assertEquals(config.getRetryGetRecordsInSeconds(), Optional.empty());
}
@Test
@ -107,7 +110,9 @@ public class KinesisClientLibConfiguratorTest {
"workerId = w123",
"maxRecords = 10",
"metricsMaxQueueSize = 20",
"applicationName = kinesis"
"applicationName = kinesis",
"retryGetRecordsInSeconds = 2",
"maxGetRecordsThreadPool = 1"
}, '\n'));
assertEquals(config.getApplicationName(), "kinesis");
@ -115,6 +120,8 @@ public class KinesisClientLibConfiguratorTest {
assertEquals(config.getWorkerIdentifier(), "w123");
assertEquals(config.getMaxRecords(), 10);
assertEquals(config.getMetricsMaxQueueSize(), 20);
assertEquals(config.getRetryGetRecordsInSeconds(), Optional.of(2));
assertEquals(config.getMaxGetRecordsThreadPool(), Optional.of(1));
}
@Test
@ -202,6 +209,42 @@ public class KinesisClientLibConfiguratorTest {
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
}
@Test
public void testEmptyOptionalVariables() {
KinesisClientLibConfiguration config =
getConfiguration(StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = ABCD," + credentialName1,
"workerId = 123",
"initialPositionInStream = TriM_Horizon",
"maxGetRecordsThreadPool = 1"
}, '\n'));
assertEquals(config.getMaxGetRecordsThreadPool(), Optional.of(1));
assertEquals(config.getRetryGetRecordsInSeconds(), Optional.empty());
}
@Test
public void testWithZeroValue() {
String test = StringUtils.join(new String[]{
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = ABCD," + credentialName1,
"workerId = 123",
"initialPositionInStream = TriM_Horizon",
"maxGetRecordsThreadPool = 0",
"retryGetRecordsInSeconds = 0"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
try {
configurator.getConfiguration(input);
} catch (Exception e) {
fail("Don't expect to fail on invalid variable value");
}
}
@Test
public void testWithInvalidIntValue() {
String test = StringUtils.join(new String[] {