diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index e9109e73..a60b5d3e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -26,8 +26,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; - private volatile int currentSizeInBytes = 0; - private volatile int currentRecordsCount = 0; + private PrefetchCounters prefetchCounters; private boolean started = false; @@ -41,6 +40,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + prefetchCounters = new PrefetchCounters(); this.executorService = executorService; } @@ -60,8 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { GetRecordsResult result = null; try { result = getRecordsResultQueue.take(); - updateBytes(result, false); - updateRecordsCount(result, false); + prefetchCounters.removed(result); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); } @@ -73,38 +72,15 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { executorService.shutdown(); } - private void updateBytes(final GetRecordsResult getRecordsResult, final boolean add) { - getRecordsResult.getRecords().forEach(record -> { - int newLength = record.getData().array().length; - if (add) { - currentSizeInBytes += newLength; - } else { - currentSizeInBytes -= newLength; - } - }); - } - - private void updateRecordsCount(final GetRecordsResult getRecordsResult, final boolean add) { - int newSize = getRecordsResult.getRecords().size(); - if (add) { - currentRecordsCount += newSize; - } else { - currentRecordsCount -= newSize; - } - } - private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() { while (true) { - if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { + if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { try { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); getRecordsResultQueue.put(getRecordsResult); - if (getRecordsResultQueue.contains(getRecordsResult)) { - updateBytes(getRecordsResult, true); - updateRecordsCount(getRecordsResult, true); - } + prefetchCounters.added(getRecordsResult); } catch (InterruptedException e) { log.error("Interrupted while adding records to the cache", e); } @@ -113,4 +89,27 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } } + private class PrefetchCounters { + private volatile long size = 0; + private volatile long byteSize = 0; + + public void added(final GetRecordsResult result) { + size += getSize(result); + byteSize += getByteSize(result); + } + + public void removed(final GetRecordsResult result) { + size -= getSize(result); + byteSize -= getByteSize(result); + } + + private long getSize(final GetRecordsResult result) { + return result.getRecords().size(); + } + + private long getByteSize(final GetRecordsResult result) { + return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); + } + } + }