From bcee1ae395f470c0c268b89fcaaee9a4fd779604 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 12:06:50 -0700 Subject: [PATCH] Adding default caching class and enum for fetching strategy. --- .../lib/worker/DataFetchingStrategy.java | 8 ++ .../lib/worker/DefaultGetRecordsCache.java | 130 +++++++++++++++--- 2 files changed, 116 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java new file mode 100644 index 00000000..05c2ab3f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java @@ -0,0 +1,8 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * + */ +public enum DataFetchingStrategy { + DEFAULT, PREFETCH_CACHED; +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java index a7e9e029..fb75e170 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java @@ -1,8 +1,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -10,39 +11,124 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * + * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum + * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the + * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple + * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till + * records are retrieved from Kinesis. If prefetching is not enabled, the cache is not used and every single call to the + * GetRecordsRetrievalStrategy is a blocking call. */ @CommonsLog public class DefaultGetRecordsCache implements GetRecordsCache { - private static final int DEFAULT_MAX_SIZE = 1; - private static final int DEFAULT_MAX_BYTE_SIZE = 1; - private static final int DEFAULT_MAX_RECORDS_COUNT = 1; - - private final Queue getRecordsResultQueue; - private final int maxSize; - private final int maxByteSize; - private final int maxRecordsCount; + private LinkedBlockingQueue getRecordsResultQueue; + private int maxSize; + private int maxByteSize; + private int maxRecordsCount; private final int maxRecordsPerCall; - @NonNull private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final ExecutorService executorService = Executors.newFixedThreadPool(1); + + private volatile int currentSizeInBytes = 0; + private volatile int currentRecordsCount = 0; + private DataFetchingStrategy dataFetchingStrategy; + + private boolean started = false; - public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, - final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { - this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); - this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); - this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); - this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); - this.maxRecordsPerCall = maxRecordsPerCall; + public DefaultGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.maxRecordsPerCall = maxRecordsPerCall; + this.dataFetchingStrategy = dataFetchingStrategy; + + if (this.dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + } + } + + private void start() { + if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + log.info("Starting prefetching thread."); + executorService.execute(new DefaultGetRecordsCacheDaemon()); + } + started = true; } @Override public GetRecordsResult getNextResult() { - return null; + if (!started) { + start(); + } + GetRecordsResult result = null; + if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + try { + result = getRecordsResultQueue.take(); + updateBytes(result, false); + updateRecordsCount(result, false); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); + } + } else { + result = validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + } + return result; } @Override public void shutdown() { - + executorService.shutdown(); } + + private void updateBytes(final GetRecordsResult getRecordsResult, final boolean add) { + getRecordsResult.getRecords().forEach(record -> { + int newLength = record.getData().array().length; + if (add) { + currentSizeInBytes += newLength; + } else { + currentSizeInBytes -= newLength; + } + }); + } + + private void updateRecordsCount(final GetRecordsResult getRecordsResult, final boolean add) { + int newSize = getRecordsResult.getRecords().size(); + if (add) { + currentRecordsCount += newSize; + } else { + currentRecordsCount -= newSize; + } + } + + private GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { + if (getRecordsResult == null) { + return new GetRecordsResult().withRecords(Collections.emptyList()); + } + return getRecordsResult; + } + + private class DefaultGetRecordsCacheDaemon implements Runnable { + @Override + public void run() { + while (true) { + if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { + try { + GetRecordsResult getRecordsResult = validateGetRecordsResult( + getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + getRecordsResultQueue.put(getRecordsResult); + if (getRecordsResultQueue.contains(getRecordsResult)) { + updateBytes(getRecordsResult, true); + updateRecordsCount(getRecordsResult, true); + } + } catch (InterruptedException e) { + log.error("Interrupted while adding records to the cache", e); + } + } + } + } + } + }