diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java new file mode 100644 index 00000000..f4dc271d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -0,0 +1,28 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import lombok.extern.apachecommons.CommonsLog; + +/** + * + */ +@CommonsLog +public class BlockingGetRecordsCache extends GetRecordsCache { + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + } + + @Override + public GetRecordsResult getNextResult() { + return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + } + + @Override + public void shutdown() { + // Nothing to do here. + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index d9114304..15d0b402 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -2,17 +2,26 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import java.util.Collections; + /** * This class is used as a cache for Prefetching data from Kinesis. */ -public interface GetRecordsCache { +public abstract class GetRecordsCache { /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. * * @return The next set of records. */ - GetRecordsResult getNextResult(); + public abstract GetRecordsResult getNextResult(); - void shutdown(); + public abstract void shutdown(); + + protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { + if (getRecordsResult == null) { + return new GetRecordsResult().withRecords(Collections.emptyList()); + } + return getRecordsResult; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java similarity index 68% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index fb75e170..0b9373e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DefaultGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,38 +20,35 @@ import lombok.extern.apachecommons.CommonsLog; * GetRecordsRetrievalStrategy is a blocking call. */ @CommonsLog -public class DefaultGetRecordsCache implements GetRecordsCache { +public class PrefetchGetRecordsCache extends GetRecordsCache { private LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - private final ExecutorService executorService = Executors.newFixedThreadPool(1); + private final ExecutorService executorService; private volatile int currentSizeInBytes = 0; private volatile int currentRecordsCount = 0; - private DataFetchingStrategy dataFetchingStrategy; private boolean started = false; - public DefaultGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, - final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, - @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + @NonNull final ExecutorService executorService) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.dataFetchingStrategy = dataFetchingStrategy; - - if (this.dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { - this.maxSize = maxSize; - this.maxByteSize = maxByteSize; - this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); - } + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.executorService = executorService; } private void start() { - if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { + if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); } @@ -64,16 +61,12 @@ public class DefaultGetRecordsCache implements GetRecordsCache { start(); } GetRecordsResult result = null; - if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) { - try { - result = getRecordsResultQueue.take(); - updateBytes(result, false); - updateRecordsCount(result, false); - } catch (InterruptedException e) { - log.error("Interrupted while getting records from the cache", e); - } - } else { - result = validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + try { + result = getRecordsResultQueue.take(); + updateBytes(result, false); + updateRecordsCount(result, false); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); } return result; } @@ -103,13 +96,6 @@ public class DefaultGetRecordsCache implements GetRecordsCache { } } - private GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { - if (getRecordsResult == null) { - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - return getRecordsResult; - } - private class DefaultGetRecordsCacheDaemon implements Runnable { @Override public void run() {