diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9954a3c1..b0ac6bfd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1289,26 +1289,40 @@ public class KinesisClientLibConfiguration { /** * - * @param maxCacheSize the max number of records stored in the getRecordsCache + * @param maxPendingProcessRecordsInput The max number of ProcessRecordsInput that can be stored in the cache before + * blocking * @return this configuration object */ - public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { - checkIsValuePositive("maxCacheSize", maxCacheSize); - this.recordsFetcherFactory.setMaxSize(maxCacheSize); + public KinesisClientLibConfiguration withMaxPendingProcessRecordsInput(final int maxPendingProcessRecordsInput) { + checkIsValuePositive("maxPendingProcessRecordsInput", maxPendingProcessRecordsInput); + this.recordsFetcherFactory.setMaxPendingProcessRecordsInput(maxPendingProcessRecordsInput); return this; } + /** + * @param maxCacheByteSize Max byte size for the cache at any given point of time. After this threshold is crossed + * the KinesisDataFetcher will be blocked until the cache has more space available. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); return this; } + /** + * @param dataFetchingStrategy The strategy for fetching data from kinesis. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); return this; } + /** + * @param maxRecordsCount The maximum number of records in the cache, accross all ProcessRecordInput objects + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { checkIsValuePositive("maxRecordsCount", maxRecordsCount); this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); @@ -1334,7 +1348,7 @@ public class KinesisClientLibConfiguration { /** * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index fde407b6..1a8ed508 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -27,17 +28,17 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the - * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum - * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the - * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple - * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. + * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting + * maxPendingProcessRecordsInput i.e. the maximum number of GetRecordsResult that the cache can store, maxByteSize + * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should + * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from + * the record processor is blocked till records are retrieved from Kinesis. */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { LinkedBlockingQueue getRecordsResultQueue; - private int maxSize; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; @@ -45,25 +46,42 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final ExecutorService executorService; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; + private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private PrefetchCounters prefetchCounters; private boolean started = false; - public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + /** + * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, long idleMillisBetweenCalls) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.maxSize = maxSize; + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); } @Override @@ -74,7 +92,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!started) { log.info("Starting prefetching thread."); - executorService.execute(new DefaultGetRecordsCacheDaemon()); + executorService.execute(defaultGetRecordsCacheDaemon); } started = true; } @@ -105,17 +123,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { + defaultGetRecordsCacheDaemon.isShutdown = true; executorService.shutdownNow(); started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { + volatile boolean isShutdown = false; + @Override public void run() { - while (true) { + while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); - callShutdownOnStrategy(); break; } if (prefetchCounters.shouldGetNewRecords()) { @@ -131,12 +151,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); - callShutdownOnStrategy(); - } catch (Error e) { - log.error("Error was thrown while getting records, please check for the error", e); + } catch (SdkClientException e) { + log.error("Exception thrown while fetching records from Kinesis", e); + } catch (Throwable e) { + log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", e); } } } + callShutdownOnStrategy(); } private void callShutdownOnStrategy() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 98073e5b..0989995d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -15,27 +15,57 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** - * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. - * Clients may choose to create separate instantiations, or re-use instantiations. + * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. */ - public interface RecordsFetcherFactory { + /** + * Returns a GetRecordsCache to be used for retrieving records for a given shard. + * + * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache + * @param shardId ShardId of the shard that the fetcher will retrieve records for + * + * @return GetRecordsCache used to get records from Kinesis. + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); /** - * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. - * - * @return Returns a record fetcher object + * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are + * blocked. + * + * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept + * before blocking. */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); - - void setMaxSize(int maxSize); + void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput); + /** + * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache + * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. + * + * @param maxByteSize The maximum byte size for the cache before blocking. + */ void setMaxByteSize(int maxByteSize); + /** + * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records + * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point + * of time. + * + * @param maxRecordsCount The mximum number of records in the cache before blocking. + */ void setMaxRecordsCount(int maxRecordsCount); + /** + * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * + * @param dataFetchingStrategy Fetching strategy to be used + */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); - + + /** + * Sets the maximum idle time between two get calls. + * + * @param idleMillisBetweenCalls Sleep millis between calls. + */ void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index e6912335..2489b452 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -234,7 +234,8 @@ class ShardConsumer { this.config = config; this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( - makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo)); + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId()); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 2b6e4e83..4d44f9ea 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -16,12 +16,14 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import lombok.extern.apachecommons.CommonsLog; @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private final int maxRecords; - private int maxSize = 3; + private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; @@ -32,18 +34,23 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { - return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls); + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, + Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("prefetch-cache-" + shardId + "-%04d") + .build()), + idleMillisBetweenCalls); } } @Override - public void setMaxSize(int maxSize){ - this.maxSize = maxSize; + public void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput){ + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; } @Override diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 17a77123..2002b37f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,19 +1,15 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -public class RecordsFetcherFactoryTest { +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +public class RecordsFetcherFactoryTest { + private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; @Mock @@ -27,14 +23,14 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index d5a68666..421c8c90 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 6190e3c9..aab1067f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -620,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList,