From 5172f4f93620d4b9167304d250171d2e940c4d7c Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 19 Sep 2017 14:25:25 -0700 Subject: [PATCH] Adding null condition to getRecords in the KinesisDataFetcher class. Changing the abstract class back to an interface. --- .../lib/worker/BlockingGetRecordsCache.java | 8 +++++--- .../clientlibrary/lib/worker/GetRecordsCache.java | 13 +++---------- .../lib/worker/KinesisDataFetcher.java | 15 ++++++++++----- .../lib/worker/PrefetchGetRecordsCache.java | 10 +++------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index f4dc271d..7ee718d0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -1,13 +1,15 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.model.GetRecordsResult; + import lombok.extern.apachecommons.CommonsLog; /** - * + * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the + * GetRecordsRetrievalStrategy class. */ @CommonsLog -public class BlockingGetRecordsCache extends GetRecordsCache { +public class BlockingGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -18,7 +20,7 @@ public class BlockingGetRecordsCache extends GetRecordsCache { @Override public GetRecordsResult getNextResult() { - return validateGetRecordsResult(getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + return getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index 15d0b402..88df34fb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -7,21 +7,14 @@ import java.util.Collections; /** * This class is used as a cache for Prefetching data from Kinesis. */ -public abstract class GetRecordsCache { +public interface GetRecordsCache { /** * This method returns the next set of records from the Cache if present, or blocks the request till it gets the * next set of records back from Kinesis. * * @return The next set of records. */ - public abstract GetRecordsResult getNextResult(); + GetRecordsResult getNextResult(); - public abstract void shutdown(); - - protected GetRecordsResult validateGetRecordsResult(final GetRecordsResult getRecordsResult) { - if (getRecordsResult == null) { - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - return getRecordsResult; - } + void shutdown(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 2ce3152a..8779b5da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,18 +14,19 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Collections; +import java.util.Date; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; - -import java.util.Date; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. @@ -77,6 +78,10 @@ class KinesisDataFetcher { } else { isShardEndReached = true; } + + if (response == null) { + response = new GetRecordsResult().withRecords(Collections.emptyList()); + } return response; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 0b9373e5..e9109e73 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -1,8 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Collections; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -16,11 +14,10 @@ import lombok.extern.apachecommons.CommonsLog; * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. If prefetching is not enabled, the cache is not used and every single call to the - * GetRecordsRetrievalStrategy is a blocking call. + * records are retrieved from Kinesis. */ @CommonsLog -public class PrefetchGetRecordsCache extends GetRecordsCache { +public class PrefetchGetRecordsCache implements GetRecordsCache { private LinkedBlockingQueue getRecordsResultQueue; private int maxSize; private int maxByteSize; @@ -102,8 +99,7 @@ public class PrefetchGetRecordsCache extends GetRecordsCache { while (true) { if (currentSizeInBytes < maxByteSize && currentRecordsCount < maxRecordsCount) { try { - GetRecordsResult getRecordsResult = validateGetRecordsResult( - getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall)); + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); getRecordsResultQueue.put(getRecordsResult); if (getRecordsResultQueue.contains(getRecordsResult)) { updateBytes(getRecordsResult, true);