Fixing bug where idleTimeBetweenReadsInMillis property was ignored in PollingConfig.

This commit is contained in:
Yatin 2020-12-23 05:38:44 -08:00
parent ac6bcdbf0a
commit e649dff90e
2 changed files with 18 additions and 1 deletions

View file

@ -52,6 +52,8 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/ */
private String streamName; private String streamName;
private boolean usePollingConfigIdleTimeValue;
/** /**
* @param kinesisClient Client used to access Kinesis services. * @param kinesisClient Client used to access Kinesis services.
*/ */
@ -86,7 +88,7 @@ public class PollingConfig implements RetrievalSpecificConfig {
} }
/** /**
* The value for how long the ShardConsumer should sleep if no records are returned from the call to * The value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. * {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
* *
* <p> * <p>
@ -122,6 +124,16 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/ */
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
/**
* Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used.
*/
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
}
/** /**
* The maximum time to wait for a future request from Kinesis to complete * The maximum time to wait for a future request from Kinesis to complete
*/ */
@ -129,6 +141,10 @@ public class PollingConfig implements RetrievalSpecificConfig {
@Override @Override
public RetrievalFactory retrievalFactory() { public RetrievalFactory retrievalFactory() {
// Prioritize the PollingConfig specified value if its updated.
if(usePollingConfigIdleTimeValue) {
recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis);
}
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory, return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,
maxRecords(), kinesisRequestTimeout, dataFetcherProvider); maxRecords(), kinesisRequestTimeout, dataFetcherProvider);
} }

View file

@ -427,6 +427,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (publisherSession.prefetchCounters().shouldGetNewRecords()) { if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try { try {
sleepBeforeNextCall(); sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now(); lastSuccessfulCall = Instant.now();