Block Fetch Thread When Retrieval Should be Paused (#252)

Block the fetching thread when the queue is considered to be full.
This ensures that the thread won't spin the CPU when it can't retrieve
more records or bytes.
This commit is contained in:
Justin Pfifer 2017-10-23 10:16:03 -07:00 committed by Sahil Palvia
parent 8ed6c81cea
commit 046e160e24

View file

@ -20,6 +20,8 @@ import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.Validate;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
@ -29,7 +31,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.NonNull;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.commons.lang.Validate;
/**
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
@ -171,6 +172,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
} finally {
MetricsHelper.endScope();
}
} else {
//
// Consumer isn't ready to receive new records will allow prefetch counters to pause
//
try {
prefetchCounters.waitForConsumer();
} catch (InterruptedException ie) {
log.info("Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started");
}
}
}
callShutdownOnStrategy();
@ -205,6 +216,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
public synchronized void removed(final ProcessRecordsInput result) {
size -= getSize(result);
byteSize -= getByteSize(result);
this.notifyAll();
}
private long getSize(final ProcessRecordsInput result) {
@ -214,10 +226,26 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private long getByteSize(final ProcessRecordsInput result) {
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
}
public synchronized void waitForConsumer() throws InterruptedException {
if (!shouldGetNewRecords()) {
log.debug("Queue is full waiting for consumer for " + idleMillisBetweenCalls + " ms");
this.wait(idleMillisBetweenCalls);
}
}
public synchronized boolean shouldGetNewRecords() {
if (log.isDebugEnabled()) {
log.debug("Current Prefetch Counter States: " + this.toString());
}
return size < maxRecordsCount && byteSize < maxByteSize;
}
@Override
public String toString() {
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size,
byteSize);
}
}
}