Fix an issue in configuring idleTimeBetweenReadsInMillis in MultiLangDaemon (#1230)

Fix an issue where the idleTimeBetweenReadInMillis configured
via MultiLangDaemon was not taking effect because it used
the auto-generated setter from Lombok to set the configured value,
while there is a custom setter that must be invoked to set the
value correctly.

There is also a general confusion between using Lombok's setter vs
custom setter in java.

Unifying the approach to use the custom Lombok-fluent-style setter
and deprecating the previously added custom setIdleTimeBetweenReadsInMillis

Correct way to configure idleTimeBetweenReadsInMillis for MultiLang is
to add this in the properties file:
idleTimeBetweenReadsInMillis = 10000 # 10 seconds

Correct way to configure for java:
configsBuilder.retrievalConfig().retrievalSpecificConfig(
    new PollingConfig(streamName, kinesisClient)
        .idleTimeBetweenReadsInMillis(Duration.ofSeconds(10).toMillis())

Issues: #999, #950, #515
This commit is contained in:
Aravinda Kidambi Srinivasan 2023-11-22 17:30:41 -08:00 committed by GitHub
parent a48f5436ee
commit 44837b702a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 3 deletions

View file

@ -17,7 +17,9 @@ package software.amazon.kinesis.multilang.config;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean; import org.apache.commons.beanutils.ConvertUtilsBean;
@ -103,12 +105,20 @@ public class MultiLangDaemonConfigurationTest {
public void testDefaultRetrievalConfigWithPollingConfigSet() { public void testDefaultRetrievalConfigWithPollingConfigSet() {
MultiLangDaemonConfiguration configuration = baseConfiguration(); MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setMaxRecords(10); configuration.setMaxRecords(10);
configuration.setIdleTimeBetweenReadsInMillis(60000);
MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory); .resolvedConfiguration(shardRecordProcessorFactory);
assertThat(resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig(), assertThat(resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig(),
instanceOf(PollingConfig.class)); instanceOf(PollingConfig.class));
assertEquals(10,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()).maxRecords());
assertEquals(60000,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.idleTimeBetweenReadsInMillis());
assertTrue(((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.usePollingConfigIdleTimeValue());
} }
@Test @Test

View file

@ -18,6 +18,8 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
@ -86,11 +88,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
* The value for how long the ShardConsumer should sleep in between calls to * The value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
* *
* If this is not set using {@link PollingConfig#idleTimeBetweenReadsInMillis},
* it defaults to 1500 ms.
*
* <p> * <p>
* Default value: 1000L * Default value: 1500L
* </p> * </p>
*/ */
private long idleTimeBetweenReadsInMillis = 1000L; @Setter(AccessLevel.NONE)
private long idleTimeBetweenReadsInMillis = 1500L;
/** /**
* Time to wait in seconds before the worker retries to get a record. * Time to wait in seconds before the worker retries to get a record.
@ -119,14 +125,23 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/ */
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
/**
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
*/
@Deprecated
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
}
/** /**
* Set the value for how long the ShardConsumer should sleep in between calls to * Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used. * {@link RecordsFetcherFactory} will be used.
*/ */
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) { public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
usePollingConfigIdleTimeValue = true; usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
} }
/** /**