Addressing PR comments.

This commit is contained in:
Sahil Palvia 2017-09-19 14:48:43 -07:00
parent 5172f4f936
commit a8edb70552

View file

@ -26,8 +26,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService; private final ExecutorService executorService;
private volatile int currentSizeInBytes = 0; private PrefetchCounters prefetchCounters;
private volatile int currentRecordsCount = 0;
private boolean started = false; private boolean started = false;
@ -41,6 +40,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.maxByteSize = maxByteSize; this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount; this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
prefetchCounters = new PrefetchCounters();
this.executorService = executorService; this.executorService = executorService;
} }
@ -60,8 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
GetRecordsResult result = null; GetRecordsResult result = null;
try { try {
result = getRecordsResultQueue.take(); result = getRecordsResultQueue.take();
updateBytes(result, false); prefetchCounters.removed(result);
updateRecordsCount(result, false);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e); log.error("Interrupted while getting records from the cache", e);
} }
@ -73,38 +72,15 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
executorService.shutdown(); executorService.shutdown();
} }
private void updateBytes(final GetRecordsResult getRecordsResult, final boolean add) {
getRecordsResult.getRecords().forEach(record -> {
int newLength = record.getData().array().length;
if (add) {
currentSizeInBytes += newLength;
} else {
currentSizeInBytes -= newLength;
}
});
}
private void updateRecordsCount(final GetRecordsResult getRecordsResult, final boolean add) {
int newSize = getRecordsResult.getRecords().size();
if (add) {
currentRecordsCount += newSize;
} else {
currentRecordsCount -= newSize;
}
}
private class DefaultGetRecordsCacheDaemon implements Runnable { private class DefaultGetRecordsCacheDaemon implements Runnable {
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) {
try { try {
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
getRecordsResultQueue.put(getRecordsResult); getRecordsResultQueue.put(getRecordsResult);
if (getRecordsResultQueue.contains(getRecordsResult)) { prefetchCounters.added(getRecordsResult);
updateBytes(getRecordsResult, true);
updateRecordsCount(getRecordsResult, true);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted while adding records to the cache", e); log.error("Interrupted while adding records to the cache", e);
} }
@ -113,4 +89,27 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
} }
} }
private class PrefetchCounters {
private volatile long size = 0;
private volatile long byteSize = 0;
public void added(final GetRecordsResult result) {
size += getSize(result);
byteSize += getByteSize(result);
}
public void removed(final GetRecordsResult result) {
size -= getSize(result);
byteSize -= getByteSize(result);
}
private long getSize(final GetRecordsResult result) {
return result.getRecords().size();
}
private long getByteSize(final GetRecordsResult result) {
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
}
}
} }