Adding null condition to getRecords in the KinesisDataFetcher class. Changing the abstract class back to an interface.

This commit is contained in:
Sahil Palvia 2017-09-19 14:25:25 -07:00
parent 14ebfb8f0f
commit 5172f4f936
4 changed files with 21 additions and 25 deletions

View file

@ -1,13 +1,15 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.extern.apachecommons.CommonsLog; import lombok.extern.apachecommons.CommonsLog;
/** /**
* * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the
* GetRecordsRetrievalStrategy class.
*/ */
@CommonsLog @CommonsLog
public class BlockingGetRecordsCache extends GetRecordsCache { public class BlockingGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall; private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -18,7 +20,7 @@ public class BlockingGetRecordsCache extends GetRecordsCache {
@Override @Override
public GetRecordsResult getNextResult() { public GetRecordsResult getNextResult() {
return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
} }
@Override @Override

View file

@ -7,21 +7,14 @@ import java.util.Collections;
/** /**
* This class is used as a cache for Prefetching data from Kinesis. * This class is used as a cache for Prefetching data from Kinesis.
*/ */
public abstract class GetRecordsCache { public interface GetRecordsCache {
/** /**
* This method returns the next set of records from the Cache if present, or blocks the request till it gets the * This method returns the next set of records from the Cache if present, or blocks the request till it gets the
* next set of records back from Kinesis. * next set of records back from Kinesis.
* *
* @return The next set of records. * @return The next set of records.
*/ */
public abstract GetRecordsResult getNextResult(); GetRecordsResult getNextResult();
public abstract void shutdown(); void shutdown();
protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) {
if (getRecordsResult == null) {
return new GetRecordsResult().withRecords(Collections.emptyList());
}
return getRecordsResult;
}
} }

View file

@ -14,18 +14,19 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.Date;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import java.util.Date; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
/** /**
* Used to get data from Amazon Kinesis. Tracks iterator state internally. * Used to get data from Amazon Kinesis. Tracks iterator state internally.
@ -77,6 +78,10 @@ class KinesisDataFetcher {
} else { } else {
isShardEndReached = true; isShardEndReached = true;
} }
if (response == null) {
response = new GetRecordsResult().withRecords(Collections.emptyList());
}
return response; return response;
} }

View file

@ -1,8 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
@ -16,11 +14,10 @@ import lombok.extern.apachecommons.CommonsLog;
* number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the
* cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple
* GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till
* records are retrieved from Kinesis. If prefetching is not enabled, the cache is not used and every single call to the * records are retrieved from Kinesis.
* GetRecordsRetrievalStrategy is a blocking call.
*/ */
@CommonsLog @CommonsLog
public class PrefetchGetRecordsCache extends GetRecordsCache { public class PrefetchGetRecordsCache implements GetRecordsCache {
private LinkedBlockingQueue<GetRecordsResult> getRecordsResultQueue; private LinkedBlockingQueue<GetRecordsResult> getRecordsResultQueue;
private int maxSize; private int maxSize;
private int maxByteSize; private int maxByteSize;
@ -102,8 +99,7 @@ public class PrefetchGetRecordsCache extends GetRecordsCache {
while (true) { while (true) {
if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) {
try { try {
GetRecordsResult getRecordsResult = validateGetRecordsResult( GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall));
getRecordsResultQueue.put(getRecordsResult); getRecordsResultQueue.put(getRecordsResult);
if (getRecordsResultQueue.contains(getRecordsResult)) { if (getRecordsResultQueue.contains(getRecordsResult)) {
updateBytes(getRecordsResult, true); updateBytes(getRecordsResult, true);