Merge pull request #216 from sahilpalvia/prefetch
Adding default caching class and fetching stratergy
This commit is contained in:
commit
40aaece7c3
6 changed files with 165 additions and 53 deletions
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
|
|
||||||
|
import lombok.extern.apachecommons.CommonsLog;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the
|
||||||
|
* GetRecordsRetrievalStrategy class.
|
||||||
|
*/
|
||||||
|
@CommonsLog
|
||||||
|
public class BlockingGetRecordsCache implements GetRecordsCache {
|
||||||
|
private final int maxRecordsPerCall;
|
||||||
|
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
|
|
||||||
|
public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||||
|
this.maxRecordsPerCall = maxRecordsPerCall;
|
||||||
|
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetRecordsResult getNextResult() {
|
||||||
|
return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
// Nothing to do here.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public enum DataFetchingStrategy {
|
||||||
|
DEFAULT, PREFETCH_CACHED;
|
||||||
|
}
|
||||||
|
|
@ -1,48 +0,0 @@
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
|
||||||
|
|
||||||
import lombok.NonNull;
|
|
||||||
import lombok.extern.apachecommons.CommonsLog;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@CommonsLog
|
|
||||||
public class DefaultGetRecordsCache implements GetRecordsCache {
|
|
||||||
private static final int DEFAULT_MAX_SIZE = 1;
|
|
||||||
private static final int DEFAULT_MAX_BYTE_SIZE = 1;
|
|
||||||
private static final int DEFAULT_MAX_RECORDS_COUNT = 1;
|
|
||||||
|
|
||||||
private final Queue<GetRecordsResult> getRecordsResultQueue;
|
|
||||||
private final int maxSize;
|
|
||||||
private final int maxByteSize;
|
|
||||||
private final int maxRecordsCount;
|
|
||||||
private final int maxRecordsPerCall;
|
|
||||||
@NonNull
|
|
||||||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
|
||||||
|
|
||||||
public DefaultGetRecordsCache(final Optional<Integer> maxSize, final Optional<Integer> maxByteSize, final Optional<Integer> maxRecordsCount,
|
|
||||||
final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
|
||||||
this.getRecordsResultQueue = new ConcurrentLinkedQueue<>();
|
|
||||||
this.maxSize = maxSize.orElse(DEFAULT_MAX_SIZE);
|
|
||||||
this.maxByteSize = maxByteSize.orElse(DEFAULT_MAX_BYTE_SIZE);
|
|
||||||
this.maxRecordsCount = maxRecordsCount.orElse(DEFAULT_MAX_RECORDS_COUNT);
|
|
||||||
this.maxRecordsPerCall = maxRecordsPerCall;
|
|
||||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetRecordsResult getNextResult() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -2,6 +2,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is used as a cache for Prefetching data from Kinesis.
|
* This class is used as a cache for Prefetching data from Kinesis.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -14,18 +14,19 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
|
||||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
|
||||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
|
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
|
||||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||||
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
import java.util.Date;
|
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||||
|
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
|
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
|
||||||
|
|
@ -77,6 +78,10 @@ class KinesisDataFetcher {
|
||||||
} else {
|
} else {
|
||||||
isShardEndReached = true;
|
isShardEndReached = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (response == null) {
|
||||||
|
response = new GetRecordsResult().withRecords(Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,115 @@
|
||||||
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||||
|
|
||||||
|
import lombok.NonNull;
|
||||||
|
import lombok.extern.apachecommons.CommonsLog;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
|
||||||
|
* next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum
|
||||||
|
* number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the
|
||||||
|
* cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple
|
||||||
|
* GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till
|
||||||
|
* records are retrieved from Kinesis.
|
||||||
|
*/
|
||||||
|
@CommonsLog
|
||||||
|
public class PrefetchGetRecordsCache implements GetRecordsCache {
|
||||||
|
private LinkedBlockingQueue<GetRecordsResult> getRecordsResultQueue;
|
||||||
|
private int maxSize;
|
||||||
|
private int maxByteSize;
|
||||||
|
private int maxRecordsCount;
|
||||||
|
private final int maxRecordsPerCall;
|
||||||
|
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||||
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
|
private PrefetchCounters prefetchCounters;
|
||||||
|
|
||||||
|
private boolean started = false;
|
||||||
|
|
||||||
|
public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
|
||||||
|
final int maxRecordsPerCall, @NonNull final DataFetchingStrategy dataFetchingStrategy,
|
||||||
|
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||||
|
@NonNull final ExecutorService executorService) {
|
||||||
|
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||||
|
this.maxRecordsPerCall = maxRecordsPerCall;
|
||||||
|
this.maxSize = maxSize;
|
||||||
|
this.maxByteSize = maxByteSize;
|
||||||
|
this.maxRecordsCount = maxRecordsCount;
|
||||||
|
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
|
||||||
|
prefetchCounters = new PrefetchCounters();
|
||||||
|
this.executorService = executorService;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start() {
|
||||||
|
if (!started) {
|
||||||
|
log.info("Starting prefetching thread.");
|
||||||
|
executorService.execute(new DefaultGetRecordsCacheDaemon());
|
||||||
|
}
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetRecordsResult getNextResult() {
|
||||||
|
if (!started) {
|
||||||
|
start();
|
||||||
|
}
|
||||||
|
GetRecordsResult result = null;
|
||||||
|
try {
|
||||||
|
result = getRecordsResultQueue.take();
|
||||||
|
prefetchCounters.removed(result);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Interrupted while getting records from the cache", e);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DefaultGetRecordsCacheDaemon implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
if (prefetchCounters.byteSize < maxByteSize && prefetchCounters.size < maxRecordsCount) {
|
||||||
|
try {
|
||||||
|
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||||
|
getRecordsResultQueue.put(getRecordsResult);
|
||||||
|
prefetchCounters.added(getRecordsResult);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Interrupted while adding records to the cache", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class PrefetchCounters {
|
||||||
|
private volatile long size = 0;
|
||||||
|
private volatile long byteSize = 0;
|
||||||
|
|
||||||
|
public void added(final GetRecordsResult result) {
|
||||||
|
size += getSize(result);
|
||||||
|
byteSize += getByteSize(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removed(final GetRecordsResult result) {
|
||||||
|
size -= getSize(result);
|
||||||
|
byteSize -= getByteSize(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getSize(final GetRecordsResult result) {
|
||||||
|
return result.getRecords().size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getByteSize(final GetRecordsResult result) {
|
||||||
|
return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue