From d39bde3b71bae61aa77f1920eb6f80d3bcabcfb8 Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Tue, 21 Nov 2023 16:51:13 -0800 Subject: [PATCH] Fix an issue in configuring idleTimeBetweenReadsInMillis in MultiLangDaemon Fix an issue where the idleTimeBetweenReadInMillis configured via MultiLangDaemon was not taking effect because it used the auto-generated setter from Lombok to set the configured value, while there is a custom setter that must be invoked to set the value correctly. There is also a general confusion between using Lombok's setter vs custom setter in java. Unifying the approach to use the custom Lombok-fluent-style setter and deprecating the previously added custom setIdleTimeBetweenReadsInMillis Correct way to configure idleTimeBetweenReadsInMillis for MultiLang is to add this in the properties file: idleTimeBetweenReadsInMillis = 10000 # 10 seconds Correct way to configure for java: configsBuilder.retrievalConfig().retrievalSpecificConfig( new PollingConfig(streamName, kinesisClient) .idleTimeBetweenReadsInMillis(Duration.ofSeconds(10).toMillis()) Issues: #999, #950, #515 --- .../MultiLangDaemonConfigurationTest.java | 10 +++++++++ .../retrieval/polling/PollingConfig.java | 21 ++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java index 07f8082b..13489fe7 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/MultiLangDaemonConfigurationTest.java @@ -17,7 +17,9 @@ package software.amazon.kinesis.multilang.config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean; @@ -103,12 +105,20 @@ public class MultiLangDaemonConfigurationTest { public void testDefaultRetrievalConfigWithPollingConfigSet() { MultiLangDaemonConfiguration configuration = baseConfiguration(); configuration.setMaxRecords(10); + configuration.setIdleTimeBetweenReadsInMillis(60000); MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration .resolvedConfiguration(shardRecordProcessorFactory); assertThat(resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig(), instanceOf(PollingConfig.class)); + assertEquals(10, + ((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()).maxRecords()); + assertEquals(60000, + ((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()) + .idleTimeBetweenReadsInMillis()); + assertTrue(((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()) + .usePollingConfigIdleTimeValue()); } @Test diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index 0cc7058d..fed3d15f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -18,6 +18,8 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Duration; import java.util.Optional; import java.util.function.Function; + +import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -86,11 +88,15 @@ public class PollingConfig implements RetrievalSpecificConfig { * The value for how long the ShardConsumer should sleep in between calls to * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. * + * If this is not set using {@link PollingConfig#idleTimeBetweenReadsInMillis}, + * it defaults to 1500 ms. + * *

- * Default value: 1000L + * Default value: 1500L *

*/ - private long idleTimeBetweenReadsInMillis = 1000L; + @Setter(AccessLevel.NONE) + private long idleTimeBetweenReadsInMillis = 1500L; /** * Time to wait in seconds before the worker retries to get a record. @@ -119,14 +125,23 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + /** + * @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead + */ + @Deprecated + public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) { + idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); + } + /** * Set the value for how long the ShardConsumer should sleep in between calls to * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in * {@link RecordsFetcherFactory} will be used. */ - public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) { + public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) { usePollingConfigIdleTimeValue = true; this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + return this; } /**