From c92a5b556c7d835602c7921c58bbeafb4c773b8c Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 18 Sep 2017 16:35:31 -0700 Subject: [PATCH] Adding the cache and the retriver stubs. (#215) * Adding the cache and the retriver stubs. * Addressing comments and adding initial documentation and changing the retreiver from interface to class. * Reverting back to the interface * Fixing minor error * Adding default cache stub --- .../lib/worker/DefaultGetRecordsCache.java | 48 +++++++++++++++++++ .../lib/worker/GetRecordsCache.java | 18 +++++++ .../lib/worker/GetRecordsRetriever.java | 12 +++++ 3 files changed, 78 insertions(+) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java new file mode 100644 index 00000000..a7e9e029 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.NonNull; +import lombok.extern.apachecommons.CommonsLog; + +/** + * + */ +@CommonsLog +public class DefaultGetRecordsCache implements GetRecordsCache { + private static final int DEFAULT_MAX_SIZE = 1; + private static final int DEFAULT_MAX_BYTE_SIZE = 1; + private static final int DEFAULT_MAX_RECORDS_COUNT = 1; + + private final Queue getRecordsResultQueue; + private final int maxSize; + private final int maxByteSize; + private final int maxRecordsCount; + private final int maxRecordsPerCall; + @NonNull + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, + final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); + this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); + this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); + this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return null; + } + + @Override + public void shutdown() { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java new file mode 100644 index 00000000..d9114304 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -0,0 +1,18 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +/** + * This class is used as a cache for Prefetching data from Kinesis. + */ +public interface GetRecordsCache { + /** + * This method returns the next set of records from the Cache if present, or blocks the request till it gets the + * next set of records back from Kinesis. + * + * @return The next set of records. + */ + GetRecordsResult getNextResult(); + + void shutdown(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java new file mode 100644 index 00000000..d5b4a782 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -0,0 +1,12 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import java.util.concurrent.Callable; + +/** + * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. + */ +public interface GetRecordsRetriever { + GetRecordsResult getNextRecords(int maxRecords); +}