Merge pull request #767 from yatins47/master
Fixing bug where idleTimeBetweenReadsInMillis property was ignored in…
This commit is contained in:
commit
e58dffee87
2 changed files with 18 additions and 1 deletions
|
|
@ -52,6 +52,8 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
*/
|
||||
private String streamName;
|
||||
|
||||
private boolean usePollingConfigIdleTimeValue;
|
||||
|
||||
/**
|
||||
* @param kinesisClient Client used to access Kinesis services.
|
||||
*/
|
||||
|
|
@ -86,7 +88,7 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
}
|
||||
|
||||
/**
|
||||
* The value for how long the ShardConsumer should sleep if no records are returned from the call to
|
||||
* The value for how long the ShardConsumer should sleep in between calls to
|
||||
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
|
||||
*
|
||||
* <p>
|
||||
|
|
@ -122,6 +124,16 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
*/
|
||||
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
|
||||
/**
|
||||
* Set the value for how long the ShardConsumer should sleep in between calls to
|
||||
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
|
||||
* {@link RecordsFetcherFactory} will be used.
|
||||
*/
|
||||
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
|
||||
usePollingConfigIdleTimeValue = true;
|
||||
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum time to wait for a future request from Kinesis to complete
|
||||
*/
|
||||
|
|
@ -129,6 +141,10 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
|
||||
@Override
|
||||
public RetrievalFactory retrievalFactory() {
|
||||
// Prioritize the PollingConfig specified value if its updated.
|
||||
if(usePollingConfigIdleTimeValue) {
|
||||
recordsFetcherFactory.idleMillisBetweenCalls(idleTimeBetweenReadsInMillis);
|
||||
}
|
||||
return new SynchronousBlockingRetrievalFactory(streamName(), kinesisClient(), recordsFetcherFactory,
|
||||
maxRecords(), kinesisRequestTimeout, dataFetcherProvider);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -427,6 +427,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
|
||||
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
|
||||
try {
|
||||
|
||||
sleepBeforeNextCall();
|
||||
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||
lastSuccessfulCall = Instant.now();
|
||||
|
|
|
|||
Loading…
Reference in a new issue