diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java new file mode 100644 index 00000000..7ee718d0 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -0,0 +1,30 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.extern.apachecommons.CommonsLog; + +/** + * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the + * GetRecordsRetrievalStrategy class. + */ +@CommonsLog +public class BlockingGetRecordsCache implements GetRecordsCache { + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + } + + @Override + public void shutdown() { + // Nothing to do here. + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java new file mode 100644 index 00000000..05c2ab3f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java @@ -0,0 +1,8 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * + */ +public enum DataFetchingStrategy { + DEFAULT, PREFETCH_CACHED; +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java deleted file mode 100644 index a7e9e029..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -import com.amazonaws.services.kinesis.model.GetRecordsResult; - -import lombok.NonNull; -import lombok.extern.apachecommons.CommonsLog; - -/** - * - */ -@CommonsLog -public class DefaultGetRecordsCache implements GetRecordsCache { - private static final int DEFAULT_MAX_SIZE = 1; - private static final int DEFAULT_MAX_BYTE_SIZE = 1; - private static final int DEFAULT_MAX_RECORDS_COUNT = 1; - - private final Queue getRecordsResultQueue; - private final int maxSize; - private final int maxByteSize; - private final int maxRecordsCount; - private final int maxRecordsPerCall; - @NonNull - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - - public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, - final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { - this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); - this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); - this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); - this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); - this.maxRecordsPerCall = maxRecordsPerCall; - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; - } - - @Override - public GetRecordsResult getNextResult() { - return null; - } - - @Override - public void shutdown() { - - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index d9114304..88df34fb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -2,6 +2,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import java.util.Collections; + /** * This class is used as a cache for Prefetching data from Kinesis. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 2ce3152a..8779b5da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,18 +14,19 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Collections; +import java.util.Date; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; - -import java.util.Date; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. @@ -77,6 +78,10 @@ class KinesisDataFetcher { } else { isShardEndReached = true; } + + if (response == null) { + response = new GetRecordsResult().withRecords(Collections.emptyList()); + } return response; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java new file mode 100644 index 00000000..a60b5d3e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -0,0 +1,115 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.NonNull; +import lombok.extern.apachecommons.CommonsLog; + +/** + * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum + * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the + * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple + * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till + * records are retrieved from Kinesis. + */ +@CommonsLog +public class PrefetchGetRecordsCache implements GetRecordsCache { + private LinkedBlockingQueue getRecordsResultQueue; + private int maxSize; + private int maxByteSize; + private int maxRecordsCount; + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final ExecutorService executorService; + + private PrefetchCounters prefetchCounters; + + private boolean started = false; + + public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + @NonNull final ExecutorService executorService) { + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.maxRecordsPerCall = maxRecordsPerCall; + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + prefetchCounters = new PrefetchCounters(); + this.executorService = executorService; + } + + private void start() { + if (!started) { + log.info("Starting prefetching thread."); + executorService.execute(new DefaultGetRecordsCacheDaemon()); + } + started = true; + } + + @Override + public GetRecordsResult getNextResult() { + if (!started) { + start(); + } + GetRecordsResult result = null; + try { + result = getRecordsResultQueue.take(); + prefetchCounters.removed(result); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); + } + return result; + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + + private class DefaultGetRecordsCacheDaemon implements Runnable { + @Override + public void run() { + while (true) { + if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { + try { + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + getRecordsResultQueue.put(getRecordsResult); + prefetchCounters.added(getRecordsResult); + } catch (InterruptedException e) { + log.error("Interrupted while adding records to the cache", e); + } + } + } + } + } + + private class PrefetchCounters { + private volatile long size = 0; + private volatile long byteSize = 0; + + public void added(final GetRecordsResult result) { + size += getSize(result); + byteSize += getByteSize(result); + } + + public void removed(final GetRecordsResult result) { + size -= getSize(result); + byteSize -= getByteSize(result); + } + + private long getSize(final GetRecordsResult result) { + return result.getRecords().size(); + } + + private long getByteSize(final GetRecordsResult result) { + return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); + } + } + +}