diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9e838ed0..b0ac6bfd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1289,12 +1289,13 @@ public class KinesisClientLibConfiguration { /** * - * @param maxCacheSize the max number of records stored in the getRecordsCache + * @param maxPendingProcessRecordsInput The max number of ProcessRecordsInput that can be stored in the cache before + * blocking * @return this configuration object */ - public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { - checkIsValuePositive("maxCacheSize", maxCacheSize); - this.recordsFetcherFactory.setMaxSize(maxCacheSize); + public KinesisClientLibConfiguration withMaxPendingProcessRecordsInput(final int maxPendingProcessRecordsInput) { + checkIsValuePositive("maxPendingProcessRecordsInput", maxPendingProcessRecordsInput); + this.recordsFetcherFactory.setMaxPendingProcessRecordsInput(maxPendingProcessRecordsInput); return this; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 90d3ab07..1a8ed508 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -29,16 +29,16 @@ import lombok.extern.apachecommons.CommonsLog; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the - * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum - * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the - * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple - * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. + * next set of records and stores it in the cache. The size of the cache is limited by setting + * maxPendingProcessRecordsInput i.e. the maximum number of GetRecordsResult that the cache can store, maxByteSize + * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should + * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from + * the record processor is blocked till records are retrieved from Kinesis. */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { LinkedBlockingQueue getRecordsResultQueue; - private int maxSize; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; @@ -58,7 +58,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache * - * @param maxSize Max size of the queue in the cache + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking * @param maxByteSize Max byte size of the queue before blocking next get records call * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects * @param maxRecordsPerCall Max records to be returned per call @@ -66,17 +67,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { * @param executorService Executor service for the cache * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call */ - public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, long idleMillisBetweenCalls) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.maxSize = maxSize; + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.idleMillisBetweenCalls = idleMillisBetweenCalls; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 5e9b9dda..0989995d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -15,54 +15,54 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** - * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. - * Clients may choose to create separate instantiations, or re-use instantiations. + * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. */ - public interface RecordsFetcherFactory { - /** - * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * Returns a GetRecordsCache to be used for retrieving records for a given shard. * * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache - * @param shardId ShardId of the shard for which the GetRecordsCache is to be returned + * @param shardId ShardId of the shard that the fetcher will retrieve records for * - * @return Returns a GetRecordsCache object + * @return GetRecordsCache used to get records from Kinesis. */ GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); /** - * This method sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold at any give time. + * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are + * blocked. * - * @param maxSize Max size for the cache. + * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept + * before blocking. */ - void setMaxSize(int maxSize); + void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput); /** - * This method sets the max byte size for the GetRecordsCache. This is the sum of all the records bytes present in - * the cache at a given point of time. + * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache + * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. * - * @param maxByteSize Maximum byte size for the cache. + * @param maxByteSize The maximum byte size for the cache before blocking. */ void setMaxByteSize(int maxByteSize); /** - * This method sets the max number of records for the GetRecordsCache. This is the sum of all the records present - * across all the ProcessRecordsInput in the cache at a given point of time. + * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records + * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point + * of time. * - * @param maxRecordsCount Maximum number of records in the cache. + * @param maxRecordsCount The mximum number of records in the cache before blocking. */ void setMaxRecordsCount(int maxRecordsCount); /** - * This method sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. * * @param dataFetchingStrategy Fetching strategy to be used */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); /** - * This method sets the maximum idle time between two get calls. + * Sets the maximum idle time between two get calls. * * @param idleMillisBetweenCalls Sleep millis between calls. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index c9f10492..4d44f9ea 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -23,7 +23,7 @@ import lombok.extern.apachecommons.CommonsLog; @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private final int maxRecords; - private int maxSize = 3; + private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; @@ -38,7 +38,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { - return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) @@ -49,8 +49,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public void setMaxSize(int maxSize){ - this.maxSize = maxSize; + public void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput){ + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; } @Override