From 190d8fb5aa0e7ff1f3318fe1a0487744644085a1 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 13:26:09 -0700 Subject: [PATCH 1/9] Adding the cache and the retriver stubs. --- .../clientlibrary/lib/worker/GetRecordsCache.java | 12 ++++++++++++ .../lib/worker/GetRecordsRetriever.java | 10 ++++++++++ 2 files changed, 22 insertions(+) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java new file mode 100644 index 00000000..af4ba1bf --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -0,0 +1,12 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +/** + * + */ +public interface GetRecordsCache { + void dispatchNextCall(); + + GetRecordsResult getNextResult(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java new file mode 100644 index 00000000..cf5edcf5 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -0,0 +1,10 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +/** + * + */ +public interface GetRecordsRetriever { + GetRecordsResult getRecords(int maxRecords); +} From 8ceb5f24921076a8c71b54cad37814b1322d1d4e Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 14:59:48 -0700 Subject: [PATCH 2/9] Addressing comments and adding initial documentation and changing the retreiver from interface to class. --- .../lib/worker/GetRecordsCache.java | 17 +++++++++++++++-- .../lib/worker/GetRecordsRetriever.java | 10 ---------- .../lib/worker/GetRecordsRetrieverDaemon.java | 15 +++++++++++++++ 3 files changed, 30 insertions(+), 12 deletions(-) delete mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index af4ba1bf..0f71d38e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -3,10 +3,23 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; /** - * + * This class is used as a cache for Prefetching data from Kinesis. */ public interface GetRecordsCache { + /** + * This method dispatches the next call to getRecords from Kinesis. + */ void dispatchNextCall(); - + + /** + * This method returns the next set of records from the Cache if present, or blocks the request till it gets the + * next set of records back from Kinesis. + * + * @return The next set of records. + */ GetRecordsResult getNextResult(); + + void addGetRecordsResultToCache(GetRecordsResult getRecordsResult); + + void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java deleted file mode 100644 index cf5edcf5..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import com.amazonaws.services.kinesis.model.GetRecordsResult; - -/** - * - */ -public interface GetRecordsRetriever { - GetRecordsResult getRecords(int maxRecords); -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java new file mode 100644 index 00000000..959c69ba --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java @@ -0,0 +1,15 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import java.util.concurrent.Callable; + +/** + * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. + */ +public class GetRecordsRetrieverDaemon implements Callable { + @Override + public GetRecordsResult call() throws Exception { + return null; + } +} From 9def9134531e5817fe25e23e91bb0ff362e24e5d Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 15:02:06 -0700 Subject: [PATCH 3/9] Reverting back to the interface --- ...ecordsRetrieverDaemon.java => GetRecordsRetriever.java} | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) rename src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/{GetRecordsRetrieverDaemon.java => GetRecordsRetriever.java} (62%) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java similarity index 62% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java index 959c69ba..2d8e318e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrieverDaemon.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -7,9 +7,6 @@ import java.util.concurrent.Callable; /** * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. */ -public class GetRecordsRetrieverDaemon implements Callable { - @Override - public GetRecordsResult call() throws Exception { - return null; - } +public interface GetRecordsRetriever { + GetRecordsResult getNextRecords(int maxRecords) } From c70ab1fc72ea90ec3d5ae8ceb66f0735785ef5e4 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 15:02:46 -0700 Subject: [PATCH 4/9] Fixing minor error --- .../kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java index 2d8e318e..d5b4a782 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -8,5 +8,5 @@ import java.util.concurrent.Callable; * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. */ public interface GetRecordsRetriever { - GetRecordsResult getNextRecords(int maxRecords) + GetRecordsResult getNextRecords(int maxRecords); } From 66b809ef7b159aa4baa8e44f07a9c8cd7643325c Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 16:32:54 -0700 Subject: [PATCH 5/9] Adding default cache stub --- .../lib/worker/DefaultGetRecordsCache.java | 48 +++++++++++++++++++ .../lib/worker/GetRecordsCache.java | 7 --- 2 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java new file mode 100644 index 00000000..a7e9e029 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.NonNull; +import lombok.extern.apachecommons.CommonsLog; + +/** + * + */ +@CommonsLog +public class DefaultGetRecordsCache implements GetRecordsCache { + private static final int DEFAULT_MAX_SIZE = 1; + private static final int DEFAULT_MAX_BYTE_SIZE = 1; + private static final int DEFAULT_MAX_RECORDS_COUNT = 1; + + private final Queue getRecordsResultQueue; + private final int maxSize; + private final int maxByteSize; + private final int maxRecordsCount; + private final int maxRecordsPerCall; + @NonNull + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, + final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); + this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); + this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); + this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return null; + } + + @Override + public void shutdown() { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index 0f71d38e..d9114304 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -6,11 +6,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; * This class is used as a cache for Prefetching data from Kinesis. */ public interface GetRecordsCache { - /** - * This method dispatches the next call to getRecords from Kinesis. - */ - void dispatchNextCall(); - /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. @@ -19,7 +14,5 @@ public interface GetRecordsCache { */ GetRecordsResult getNextResult(); - void addGetRecordsResultToCache(GetRecordsResult getRecordsResult); - void shutdown(); } From bcee1ae395f470c0c268b89fcaaee9a4fd779604 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 12:06:50 -0700 Subject: [PATCH 6/9] Adding default caching class and enum for fetching strategy. --- .../lib/worker/DataFetchingStrategy.java | 8 ++ .../lib/worker/DefaultGetRecordsCache.java | 130 +++++++++++++++--- 2 files changed, 116 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java new file mode 100644 index 00000000..05c2ab3f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java @@ -0,0 +1,8 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * + */ +public enum DataFetchingStrategy { + DEFAULT, PREFETCH_CACHED; +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java index a7e9e029..fb75e170 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java @@ -1,8 +1,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -10,39 +11,124 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * + * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum + * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the + * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple + * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till + * records are retrieved from Kinesis. If prefetching is not enabled, the cache is not used and every single call to the + * GetRecordsRetrievalStrategy is a blocking call. */ @CommonsLog public class DefaultGetRecordsCache implements GetRecordsCache { - private static final int DEFAULT_MAX_SIZE = 1; - private static final int DEFAULT_MAX_BYTE_SIZE = 1; - private static final int DEFAULT_MAX_RECORDS_COUNT = 1; - - private final Queue getRecordsResultQueue; - private final int maxSize; - private final int maxByteSize; - private final int maxRecordsCount; + private LinkedBlockingQueue getRecordsResultQueue; + private int maxSize; + private int maxByteSize; + private int maxRecordsCount; private final int maxRecordsPerCall; - @NonNull private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final ExecutorService executorService = Executors.newFixedThreadPool(1); + + private volatile int currentSizeInBytes = 0; + private volatile int currentRecordsCount = 0; + private DataFetchingStrategy dataFetchingStrategy; + + private boolean started = false; - public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, - final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { - this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); - this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); - this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); - this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); - this.maxRecordsPerCall = maxRecordsPerCall; + public DefaultGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.maxRecordsPerCall = maxRecordsPerCall; + this.dataFetchingStrategy = dataFetchingStrategy; + + if (this.dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + } + } + + private void start() { + if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + log.info("Starting prefetching thread."); + executorService.execute(new DefaultGetRecordsCacheDaemon()); + } + started = true; } @Override public GetRecordsResult getNextResult() { - return null; + if (!started) { + start(); + } + GetRecordsResult result = null; + if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + try { + result = getRecordsResultQueue.take(); + updateBytes(result, false); + updateRecordsCount(result, false); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); + } + } else { + result = validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + } + return result; } @Override public void shutdown() { - + executorService.shutdown(); } + + private void updateBytes(final GetRecordsResult getRecordsResult, final boolean add) { + getRecordsResult.getRecords().forEach(record -> { + int newLength = record.getData().array().length; + if (add) { + currentSizeInBytes += newLength; + } else { + currentSizeInBytes -= newLength; + } + }); + } + + private void updateRecordsCount(final GetRecordsResult getRecordsResult, final boolean add) { + int newSize = getRecordsResult.getRecords().size(); + if (add) { + currentRecordsCount += newSize; + } else { + currentRecordsCount -= newSize; + } + } + + private GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { + if (getRecordsResult == null) { + return new GetRecordsResult().withRecords(Collections.emptyList()); + } + return getRecordsResult; + } + + private class DefaultGetRecordsCacheDaemon implements Runnable { + @Override + public void run() { + while (true) { + if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { + try { + GetRecordsResult getRecordsResult = validateGetRecordsResult( + getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + getRecordsResultQueue.put(getRecordsResult); + if (getRecordsResultQueue.contains(getRecordsResult)) { + updateBytes(getRecordsResult, true); + updateRecordsCount(getRecordsResult, true); + } + } catch (InterruptedException e) { + log.error("Interrupted while adding records to the cache", e); + } + } + } + } + } + } From 14ebfb8f0f1c60401553266cbe9940028f67ca3d Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 13:57:32 -0700 Subject: [PATCH 7/9] Adding Blocking cache and spilting into blocking and prefetching cache. Changing the GetRecordsCache interface to abstract class. --- .../lib/worker/BlockingGetRecordsCache.java | 28 +++++++++++ .../lib/worker/GetRecordsCache.java | 15 ++++-- ...ache.java => PrefetchGetRecordsCache.java} | 50 +++++++------------ 3 files changed, 58 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java rename src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/{DefaultGetRecordsCache.java => PrefetchGetRecordsCache.java} (68%) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java new file mode 100644 index 00000000..f4dc271d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -0,0 +1,28 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import lombok.extern.apachecommons.CommonsLog; + +/** + * + */ +@CommonsLog +public class BlockingGetRecordsCache extends GetRecordsCache { + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + } + + @Override + public void shutdown() { + // Nothing to do here. + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index d9114304..15d0b402 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -2,17 +2,26 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import java.util.Collections; + /** * This class is used as a cache for Prefetching data from Kinesis. */ -public interface GetRecordsCache { +public abstract class GetRecordsCache { /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. * * @return The next set of records. */ - GetRecordsResult getNextResult(); + public abstract GetRecordsResult getNextResult(); - void shutdown(); + public abstract void shutdown(); + + protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { + if (getRecordsResult == null) { + return new GetRecordsResult().withRecords(Collections.emptyList()); + } + return getRecordsResult; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java similarity index 68% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index fb75e170..0b9373e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,38 +20,35 @@ import lombok.extern.apachecommons.CommonsLog; * GetRecordsRetrievalStrategy is a blocking call. */ @CommonsLog -public class DefaultGetRecordsCache implements GetRecordsCache { +public class PrefetchGetRecordsCache extends GetRecordsCache { private LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - private final ExecutorService executorService = Executors.newFixedThreadPool(1); + private final ExecutorService executorService; private volatile int currentSizeInBytes = 0; private volatile int currentRecordsCount = 0; - private DataFetchingStrategy dataFetchingStrategy; private boolean started = false; - public DefaultGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, - final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, - @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + @NonNull final ExecutorService executorService) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.dataFetchingStrategy = dataFetchingStrategy; - - if (this.dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { - this.maxSize = maxSize; - this.maxByteSize = maxByteSize; - this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); - } + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.executorService = executorService; } private void start() { - if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); } @@ -64,16 +61,12 @@ public class DefaultGetRecordsCache implements GetRecordsCache { start(); } GetRecordsResult result = null; - if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { - try { - result = getRecordsResultQueue.take(); - updateBytes(result, false); - updateRecordsCount(result, false); - } catch (InterruptedException e) { - log.error("Interrupted while getting records from the cache", e); - } - } else { - result = validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + try { + result = getRecordsResultQueue.take(); + updateBytes(result, false); + updateRecordsCount(result, false); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); } return result; } @@ -103,13 +96,6 @@ public class DefaultGetRecordsCache implements GetRecordsCache { } } - private GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { - if (getRecordsResult == null) { - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - return getRecordsResult; - } - private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() { From 5172f4f93620d4b9167304d250171d2e940c4d7c Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 14:25:25 -0700 Subject: [PATCH 8/9] Adding null condition to getRecords in the KinesisDataFetcher class. Changing the abstract class back to an interface. --- .../lib/worker/BlockingGetRecordsCache.java | 8 +++++--- .../clientlibrary/lib/worker/GetRecordsCache.java | 13 +++---------- .../lib/worker/KinesisDataFetcher.java | 15 ++++++++++----- .../lib/worker/PrefetchGetRecordsCache.java | 10 +++------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index f4dc271d..7ee718d0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -1,13 +1,15 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; + import lombok.extern.apachecommons.CommonsLog; /** - * + * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the + * GetRecordsRetrievalStrategy class. */ @CommonsLog -public class BlockingGetRecordsCache extends GetRecordsCache { +public class BlockingGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -18,7 +20,7 @@ public class BlockingGetRecordsCache extends GetRecordsCache { @Override public GetRecordsResult getNextResult() { - return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index 15d0b402..88df34fb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -7,21 +7,14 @@ import java.util.Collections; /** * This class is used as a cache for Prefetching data from Kinesis. */ -public abstract class GetRecordsCache { +public interface GetRecordsCache { /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. * * @return The next set of records. */ - public abstract GetRecordsResult getNextResult(); + GetRecordsResult getNextResult(); - public abstract void shutdown(); - - protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { - if (getRecordsResult == null) { - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - return getRecordsResult; - } + void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 2ce3152a..8779b5da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,18 +14,19 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Collections; +import java.util.Date; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; - -import java.util.Date; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. @@ -77,6 +78,10 @@ class KinesisDataFetcher { } else { isShardEndReached = true; } + + if (response == null) { + response = new GetRecordsResult().withRecords(Collections.emptyList()); + } return response; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 0b9373e5..e9109e73 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -1,8 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Collections; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -16,11 +14,10 @@ import lombok.extern.apachecommons.CommonsLog; * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. If prefetching is not enabled, the cache is not used and every single call to the - * GetRecordsRetrievalStrategy is a blocking call. + * records are retrieved from Kinesis. */ @CommonsLog -public class PrefetchGetRecordsCache extends GetRecordsCache { +public class PrefetchGetRecordsCache implements GetRecordsCache { private LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; @@ -102,8 +99,7 @@ public class PrefetchGetRecordsCache extends GetRecordsCache { while (true) { if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { try { - GetRecordsResult getRecordsResult = validateGetRecordsResult( - getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); getRecordsResultQueue.put(getRecordsResult); if (getRecordsResultQueue.contains(getRecordsResult)) { updateBytes(getRecordsResult, true); From a8edb70552b7d471450b6605edc799c1551c8d66 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 14:48:43 -0700 Subject: [PATCH 9/9] Addressing PR comments. --- .../lib/worker/PrefetchGetRecordsCache.java | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index e9109e73..a60b5d3e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -26,8 +26,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; - private volatile int currentSizeInBytes = 0; - private volatile int currentRecordsCount = 0; + private PrefetchCounters prefetchCounters; private boolean started = false; @@ -41,6 +40,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + prefetchCounters = new PrefetchCounters(); this.executorService = executorService; } @@ -60,8 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { GetRecordsResult result = null; try { result = getRecordsResultQueue.take(); - updateBytes(result, false); - updateRecordsCount(result, false); + prefetchCounters.removed(result); } catch (InterruptedException e) { log.error("Interrupted while getting records from the cache", e); } @@ -73,38 +72,15 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { executorService.shutdown(); } - private void updateBytes(final GetRecordsResult getRecordsResult, final boolean add) { - getRecordsResult.getRecords().forEach(record -> { - int newLength = record.getData().array().length; - if (add) { - currentSizeInBytes += newLength; - } else { - currentSizeInBytes -= newLength; - } - }); - } - - private void updateRecordsCount(final GetRecordsResult getRecordsResult, final boolean add) { - int newSize = getRecordsResult.getRecords().size(); - if (add) { - currentRecordsCount += newSize; - } else { - currentRecordsCount -= newSize; - } - } - private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() { while (true) { - if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { + if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) { try { GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); getRecordsResultQueue.put(getRecordsResult); - if (getRecordsResultQueue.contains(getRecordsResult)) { - updateBytes(getRecordsResult, true); - updateRecordsCount(getRecordsResult, true); - } + prefetchCounters.added(getRecordsResult); } catch (InterruptedException e) { log.error("Interrupted while adding records to the cache", e); } @@ -113,4 +89,27 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } } + private class PrefetchCounters { + private volatile long size = 0; + private volatile long byteSize = 0; + + public void added(final GetRecordsResult result) { + size += getSize(result); + byteSize += getByteSize(result); + } + + public void removed(final GetRecordsResult result) { + size -= getSize(result); + byteSize -= getByteSize(result); + } + + private long getSize(final GetRecordsResult result) { + return result.getRecords().size(); + } + + private long getByteSize(final GetRecordsResult result) { + return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); + } + } + }