Adding Blocking cache and spilting into blocking and prefetching cache. Changing the GetRecordsCache interface to abstract class.

This commit is contained in:
Sahil Palvia 2017-09-19 13:57:32 -07:00
parent b6f41d21f8
commit 14ebfb8f0f
3 changed files with 58 additions and 35 deletions

View file

@ -0,0 +1,28 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.extern.apachecommons.CommonsLog;
/**
*
*/
@CommonsLog
public class BlockingGetRecordsCache extends GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.maxRecordsPerCall = maxRecordsPerCall;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
}
@Override
public GetRecordsResult getNextResult() {
return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall));
}
@Override
public void shutdown() {
// Nothing to do here.
}
}

View file

@ -2,17 +2,26 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import java.util.Collections;
/**
* This class is used as a cache for Prefetching data from Kinesis.
*/
public interface GetRecordsCache {
public abstract class GetRecordsCache {
/**
* This method returns the next set of records from the Cache if present, or blocks the request till it gets the
* next set of records back from Kinesis.
*
* @return The next set of records.
*/
GetRecordsResult getNextResult();
public abstract GetRecordsResult getNextResult();
void shutdown();
public abstract void shutdown();
protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) {
if (getRecordsResult == null) {
return new GetRecordsResult().withRecords(Collections.emptyList());
}
return getRecordsResult;
}
}

View file

@ -20,38 +20,35 @@ import lombok.extern.apachecommons.CommonsLog;
* GetRecordsRetrievalStrategy is a blocking call.
*/
@CommonsLog
public class DefaultGetRecordsCache implements GetRecordsCache {
public class PrefetchGetRecordsCache extends GetRecordsCache {
private LinkedBlockingQueue<GetRecordsResult> getRecordsResultQueue;
private int maxSize;
private int maxByteSize;
private int maxRecordsCount;
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final ExecutorService executorService;
private volatile int currentSizeInBytes = 0;
private volatile int currentRecordsCount = 0;
private DataFetchingStrategy dataFetchingStrategy;
private boolean started = false;
public DefaultGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.dataFetchingStrategy = dataFetchingStrategy;
if (this.dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) {
this.maxSize = maxSize;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
}
this.maxSize = maxSize;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
this.executorService = executorService;
}
private void start() {
if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) {
if (!started) {
log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon());
}
@ -64,16 +61,12 @@ public class DefaultGetRecordsCache implements GetRecordsCache {
start();
}
GetRecordsResult result = null;
if (dataFetchingStrategy.equals(DataFetchingStrategy.PREFETCH_CACHED)) {
try {
result = getRecordsResultQueue.take();
updateBytes(result, false);
updateRecordsCount(result, false);
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
}
} else {
result = validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall));
try {
result = getRecordsResultQueue.take();
updateBytes(result, false);
updateRecordsCount(result, false);
} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
}
return result;
}
@ -103,13 +96,6 @@ public class DefaultGetRecordsCache implements GetRecordsCache {
}
}
private GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) {
if (getRecordsResult == null) {
return new GetRecordsResult().withRecords(Collections.emptyList());
}
return getRecordsResult;
}
private class DefaultGetRecordsCacheDaemon implements Runnable {
@Override
public void run() {