diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 5369c0f4..06e77c8c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,8 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.lang.Validate; + import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; @@ -29,7 +31,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; -import org.apache.commons.lang.Validate; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the @@ -171,6 +172,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } finally { MetricsHelper.endScope(); } + } else { + // + // Consumer isn't ready to receive new records will allow prefetch counters to pause + // + try { + prefetchCounters.waitForConsumer(); + } catch (InterruptedException ie) { + log.info("Thread was interrupted while waiting for the consumer. " + + "Shutdown has probably been started"); + } } } callShutdownOnStrategy(); @@ -205,6 +216,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { public synchronized void removed(final ProcessRecordsInput result) { size -= getSize(result); byteSize -= getByteSize(result); + this.notifyAll(); } private long getSize(final ProcessRecordsInput result) { @@ -214,10 +226,26 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private long getByteSize(final ProcessRecordsInput result) { return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); } + + public synchronized void waitForConsumer() throws InterruptedException { + if (!shouldGetNewRecords()) { + log.debug("Queue is full waiting for consumer for " + idleMillisBetweenCalls + " ms"); + this.wait(idleMillisBetweenCalls); + } + } public synchronized boolean shouldGetNewRecords() { + if (log.isDebugEnabled()) { + log.debug("Current Prefetch Counter States: " + this.toString()); + } return size < maxRecordsCount && byteSize < maxByteSize; } + + @Override + public String toString() { + return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size, + byteSize); + } } }