From 20ba891cb70855844ef40033b1fb1683ae7ff245 Mon Sep 17 00:00:00 2001 From: Wei Date: Tue, 29 Aug 2017 13:42:21 -0700 Subject: [PATCH] add configuration for get timeout information --- .../worker/KinesisClientLibConfiguration.java | 27 +++++++++++ .../KinesisClientLibConfiguratorTest.java | 45 ++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index e9673414..6da407d2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -217,6 +217,12 @@ public class KinesisClientLibConfiguration { @Getter private Optional timeoutInSeconds = Optional.empty(); + @Getter + private Optional retryGetRecordsInSeconds = Optional.empty(); + + @Getter + private Optional maxGetRecordsThreadPool = Optional.empty(); + @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; @@ -1111,6 +1117,27 @@ public class KinesisClientLibConfiguration { return this; } + + /** + * @param retryGetRecordsInSeconds the time in secods to wait before the worker retries to get a record. + * @return this configuration object. + */ + public KinesisClientLibConfiguration withRetryGetRecordsInSeconds(final int retryGetRecordsInSeconds) { + checkIsValuePositive("retryGetRecordsInSeconds", retryGetRecordsInSeconds); + this.retryGetRecordsInSeconds = Optional.of(retryGetRecordsInSeconds); + return this; + } + + /** + *@param maxGetRecordsThreadPool the max number of threads in the getRecords thread pool. + *@return this configuration object + */ + public KinesisClientLibConfiguration withMaxGetRecordsThreadPool(final int maxGetRecordsThreadPool) { + checkIsValuePositive("maxGetRecordsThreadPool", maxGetRecordsThreadPool); + this.maxGetRecordsThreadPool = Optional.of(maxGetRecordsThreadPool); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index cbdd0a2d..d16be640 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.util.Optional; import java.util.Set; import org.apache.commons.lang.StringUtils; @@ -60,6 +61,8 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getApplicationName(), "b"); assertEquals(config.getStreamName(), "a"); assertEquals(config.getWorkerIdentifier(), "123"); + assertEquals(config.getMaxGetRecordsThreadPool(), Optional.empty()); + assertEquals(config.getRetryGetRecordsInSeconds(), Optional.empty()); } @Test @@ -107,7 +110,9 @@ public class KinesisClientLibConfiguratorTest { "workerId = w123", "maxRecords = 10", "metricsMaxQueueSize = 20", - "applicationName = kinesis" + "applicationName = kinesis", + "retryGetRecordsInSeconds = 2", + "maxGetRecordsThreadPool = 1" }, '\n')); assertEquals(config.getApplicationName(), "kinesis"); @@ -115,6 +120,8 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getWorkerIdentifier(), "w123"); assertEquals(config.getMaxRecords(), 10); assertEquals(config.getMetricsMaxQueueSize(), 20); + assertEquals(config.getRetryGetRecordsInSeconds(), Optional.of(2)); + assertEquals(config.getMaxGetRecordsThreadPool(), Optional.of(1)); } @Test @@ -202,6 +209,42 @@ public class KinesisClientLibConfiguratorTest { assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); } + @Test + public void testEmptyOptionalVariables() { + KinesisClientLibConfiguration config = + getConfiguration(StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD," + credentialName1, + "workerId = 123", + "initialPositionInStream = TriM_Horizon", + "maxGetRecordsThreadPool = 1" + }, '\n')); + assertEquals(config.getMaxGetRecordsThreadPool(), Optional.of(1)); + assertEquals(config.getRetryGetRecordsInSeconds(), Optional.empty()); + } + + @Test + public void testWithZeroValue() { + String test = StringUtils.join(new String[]{ + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = ABCD," + credentialName1, + "workerId = 123", + "initialPositionInStream = TriM_Horizon", + "maxGetRecordsThreadPool = 0", + "retryGetRecordsInSeconds = 0" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + try { + configurator.getConfiguration(input); + } catch (Exception e) { + fail("Don't expect to fail on invalid variable value"); + + } + } + @Test public void testWithInvalidIntValue() { String test = StringUtils.join(new String[] {