diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java new file mode 100644 index 00000000..a7e9e029 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.NonNull; +import lombok.extern.apachecommons.CommonsLog; + +/** + * + */ +@CommonsLog +public class DefaultGetRecordsCache implements GetRecordsCache { + private static final int DEFAULT_MAX_SIZE = 1; + private static final int DEFAULT_MAX_BYTE_SIZE = 1; + private static final int DEFAULT_MAX_RECORDS_COUNT = 1; + + private final Queue getRecordsResultQueue; + private final int maxSize; + private final int maxByteSize; + private final int maxRecordsCount; + private final int maxRecordsPerCall; + @NonNull + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public DefaultGetRecordsCache(final Optional maxSize, final Optional maxByteSize, final Optional maxRecordsCount, + final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.getRecordsResultQueue = new ConcurrentLinkedQueue<>(); + this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE); + this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE); + this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT); + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return null; + } + + @Override + public void shutdown() { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java new file mode 100644 index 00000000..d9114304 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -0,0 +1,18 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +/** + * This class is used as a cache for Prefetching data from Kinesis. + */ +public interface GetRecordsCache { + /** + * This method returns the next set of records from the Cache if present, or blocks the request till it gets the + * next set of records back from Kinesis. + * + * @return The next set of records. + */ + GetRecordsResult getNextResult(); + + void shutdown(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java new file mode 100644 index 00000000..d5b4a782 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -0,0 +1,12 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import java.util.concurrent.Callable; + +/** + * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. + */ +public interface GetRecordsRetriever { + GetRecordsResult getNextRecords(int maxRecords); +}