diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index d8c2405a..181cea76 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -52,6 +52,8 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private String streamName; + private boolean usePollingConfigIdleTimeValue; + /** * @param kinesisClient Client used to access Kinesis services. */ @@ -86,7 +88,7 @@ public class PollingConfig implements RetrievalSpecificConfig { } /** - * The value for how long the ShardConsumer should sleep if no records are returned from the call to + * The value for how long the ShardConsumer should sleep in between calls to * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. * *
@@ -122,6 +124,16 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + /** + * Set the value for how long the ShardConsumer should sleep in between calls to + * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in + * {@link RecordsFetcherFactory} will be used. + */ + public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) { + usePollingConfigIdleTimeValue = true; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + } + /** * The maximum time to wait for a future request from Kinesis to complete */ @@ -129,6 +141,10 @@ public class PollingConfig implements RetrievalSpecificConfig { @Override public RetrievalFactory retrievalFactory() { + // Prioritize the PollingConfig specified value if its updated. + if(usePollingConfigIdleTimeValue) { + recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis); + } return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, maxRecords(), kinesisRequestTimeout, dataFetcherProvider); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 9373aa42..9370994b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -427,6 +427,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); if (publisherSession.prefetchCounters().shouldGetNewRecords()) { try { + sleepBeforeNextCall(); GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); lastSuccessfulCall = Instant.now();