From ffdfe82b79e0a2f5901bb868ce3b67adf2f2e311 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 3 Oct 2017 15:29:38 -0700 Subject: [PATCH 1/4] Fixing issue with prefetch thread, where it kept on dying and falling behind. Catching throwable instead of error. Assigning thread name to the prefetch thread. --- .../lib/worker/PrefetchGetRecordsCache.java | 7 +++++-- .../lib/worker/SimpleRecordsFetcherFactory.java | 9 ++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index fde407b6..cf76442e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -110,9 +110,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } private class DefaultGetRecordsCacheDaemon implements Runnable { + private volatile boolean isShutdown = false; + @Override public void run() { - while (true) { + while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); callShutdownOnStrategy(); @@ -132,7 +134,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); callShutdownOnStrategy(); - } catch (Error e) { + } catch (Throwable e) { log.error("Error was thrown while getting records, please check for the error", e); } } @@ -143,6 +145,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!getRecordsRetrievalStrategy.isShutdown()) { getRecordsRetrievalStrategy.shutdown(); } + isShutdown = true; } private void sleepBeforeNextCall() throws InterruptedException { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 2b6e4e83..cde0f53f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -16,6 +16,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import lombok.extern.apachecommons.CommonsLog; @CommonsLog @@ -37,7 +39,12 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls); + getRecordsRetrievalStrategy, + Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("prefetch-get-records-cache-%d") + .build()), + idleMillisBetweenCalls); } } From 80216b9a595a8b1fccc53e8c507abef0a0447b83 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 3 Oct 2017 22:57:33 -0700 Subject: [PATCH 2/4] Added shardId to thread name. Added documentation to the methods. --- .../worker/KinesisClientLibConfiguration.java | 15 +++++++- .../lib/worker/PrefetchGetRecordsCache.java | 11 ++++++ .../lib/worker/RecordsFetcherFactory.java | 36 +++++++++++++++++-- .../lib/worker/ShardConsumer.java | 3 +- .../worker/SimpleRecordsFetcherFactory.java | 4 +-- .../lib/worker/RecordsFetcherFactoryTest.java | 20 +++++------ .../lib/worker/ShardConsumerTest.java | 4 +-- .../clientlibrary/lib/worker/WorkerTest.java | 3 +- 8 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9954a3c1..9e838ed0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1298,17 +1298,30 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param maxCacheByteSize Max byte size for the cache at any given point of time. After this threshold is crossed + * the KinesisDataFetcher will be blocked until the cache has more space available. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); return this; } + /** + * @param dataFetchingStrategy The strategy for fetching data from kinesis. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); return this; } + /** + * @param maxRecordsCount The maximum number of records in the cache, accross all ProcessRecordInput objects + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { checkIsValuePositive("maxRecordsCount", maxRecordsCount); this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); @@ -1334,7 +1347,7 @@ public class KinesisClientLibConfiguration { /** * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index cf76442e..b3d4b6f6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -50,6 +50,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private boolean started = false; + /** + * Constructor + * + * @param maxSize Max size of the queue in the cache + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 98073e5b..5e9b9dda 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -24,18 +24,48 @@ public interface RecordsFetcherFactory { /** * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. * - * @return Returns a record fetcher object + * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache + * @param shardId ShardId of the shard for which the GetRecordsCache is to be returned + * + * @return Returns a GetRecordsCache object */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); + /** + * This method sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold at any give time. + * + * @param maxSize Max size for the cache. + */ void setMaxSize(int maxSize); + /** + * This method sets the max byte size for the GetRecordsCache. This is the sum of all the records bytes present in + * the cache at a given point of time. + * + * @param maxByteSize Maximum byte size for the cache. + */ void setMaxByteSize(int maxByteSize); + /** + * This method sets the max number of records for the GetRecordsCache. This is the sum of all the records present + * across all the ProcessRecordsInput in the cache at a given point of time. + * + * @param maxRecordsCount Maximum number of records in the cache. + */ void setMaxRecordsCount(int maxRecordsCount); + /** + * This method sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * + * @param dataFetchingStrategy Fetching strategy to be used + */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); - + + /** + * This method sets the maximum idle time between two get calls. + * + * @param idleMillisBetweenCalls Sleep millis between calls. + */ void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index e6912335..2489b452 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -234,7 +234,8 @@ class ShardConsumer { this.config = config; this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( - makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo)); + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId()); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index cde0f53f..0a577ada 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -34,7 +34,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { @@ -42,7 +42,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("prefetch-get-records-cache-%d") + .setNameFormat("prefetch-get-records-cache-" + shardId + "-%d") .build()), idleMillisBetweenCalls); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 17a77123..2002b37f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,19 +1,15 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -public class RecordsFetcherFactoryTest { +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +public class RecordsFetcherFactoryTest { + private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; @Mock @@ -27,14 +23,14 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index d5a68666..421c8c90 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 6190e3c9..aab1067f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -620,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList, From 5717bab9b6fd7d518f5545d09accc5163bebff77 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 4 Oct 2017 11:14:55 -0700 Subject: [PATCH 3/4] Addressing comments from PR. Catching expected exceptions, changing log message for unexcepted exception. Changing threadname. --- .../lib/worker/PrefetchGetRecordsCache.java | 26 +++++++++++++------ .../worker/SimpleRecordsFetcherFactory.java | 2 +- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index b3d4b6f6..90d3ab07 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -27,7 +28,7 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple @@ -45,13 +46,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final ExecutorService executorService; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; + private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private PrefetchCounters prefetchCounters; private boolean started = false; /** - * Constructor + * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache * * @param maxSize Max size of the queue in the cache * @param maxByteSize Max byte size of the queue before blocking next get records call @@ -75,6 +80,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); } @Override @@ -85,7 +91,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!started) { log.info("Starting prefetching thread."); - executorService.execute(new DefaultGetRecordsCacheDaemon()); + executorService.execute(defaultGetRecordsCacheDaemon); } started = true; } @@ -116,19 +122,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { + defaultGetRecordsCacheDaemon.isShutdown = true; executorService.shutdownNow(); started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { - private volatile boolean isShutdown = false; + volatile boolean isShutdown = false; @Override public void run() { while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); - callShutdownOnStrategy(); break; } if (prefetchCounters.shouldGetNewRecords()) { @@ -144,19 +150,23 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); - callShutdownOnStrategy(); + } catch (SdkClientException e) { + log.error("Exception thrown while fetching records from Kinesis", e); } catch (Throwable e) { - log.error("Error was thrown while getting records, please check for the error", e); + log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", e); } } } + callShutdownOnStrategy(); } private void callShutdownOnStrategy() { if (!getRecordsRetrievalStrategy.isShutdown()) { getRecordsRetrievalStrategy.shutdown(); } - isShutdown = true; } private void sleepBeforeNextCall() throws InterruptedException { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 0a577ada..c9f10492 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -42,7 +42,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("prefetch-get-records-cache-" + shardId + "-%d") + .setNameFormat("prefetch-cache-" + shardId + "-%04d") .build()), idleMillisBetweenCalls); } From 0cf34461a7a3ca298097675bdbeb1514ea561894 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 4 Oct 2017 12:00:36 -0700 Subject: [PATCH 4/4] Updated the documentation. Changed the variable name from maxSize to maxPendingProcessRecordsInput. --- .../worker/KinesisClientLibConfiguration.java | 9 ++--- .../lib/worker/PrefetchGetRecordsCache.java | 21 +++++------ .../lib/worker/RecordsFetcherFactory.java | 36 +++++++++---------- .../worker/SimpleRecordsFetcherFactory.java | 8 ++--- 4 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9e838ed0..b0ac6bfd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1289,12 +1289,13 @@ public class KinesisClientLibConfiguration { /** * - * @param maxCacheSize the max number of records stored in the getRecordsCache + * @param maxPendingProcessRecordsInput The max number of ProcessRecordsInput that can be stored in the cache before + * blocking * @return this configuration object */ - public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { - checkIsValuePositive("maxCacheSize", maxCacheSize); - this.recordsFetcherFactory.setMaxSize(maxCacheSize); + public KinesisClientLibConfiguration withMaxPendingProcessRecordsInput(final int maxPendingProcessRecordsInput) { + checkIsValuePositive("maxPendingProcessRecordsInput", maxPendingProcessRecordsInput); + this.recordsFetcherFactory.setMaxPendingProcessRecordsInput(maxPendingProcessRecordsInput); return this; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 90d3ab07..1a8ed508 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -29,16 +29,16 @@ import lombok.extern.apachecommons.CommonsLog; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the - * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum - * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the - * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple - * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. + * next set of records and stores it in the cache. The size of the cache is limited by setting + * maxPendingProcessRecordsInput i.e. the maximum number of GetRecordsResult that the cache can store, maxByteSize + * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should + * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from + * the record processor is blocked till records are retrieved from Kinesis. */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { LinkedBlockingQueue getRecordsResultQueue; - private int maxSize; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; @@ -58,7 +58,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache * - * @param maxSize Max size of the queue in the cache + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking * @param maxByteSize Max byte size of the queue before blocking next get records call * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects * @param maxRecordsPerCall Max records to be returned per call @@ -66,17 +67,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { * @param executorService Executor service for the cache * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call */ - public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, long idleMillisBetweenCalls) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.maxSize = maxSize; + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.idleMillisBetweenCalls = idleMillisBetweenCalls; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 5e9b9dda..0989995d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -15,54 +15,54 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** - * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. - * Clients may choose to create separate instantiations, or re-use instantiations. + * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. */ - public interface RecordsFetcherFactory { - /** - * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * Returns a GetRecordsCache to be used for retrieving records for a given shard. * * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache - * @param shardId ShardId of the shard for which the GetRecordsCache is to be returned + * @param shardId ShardId of the shard that the fetcher will retrieve records for * - * @return Returns a GetRecordsCache object + * @return GetRecordsCache used to get records from Kinesis. */ GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); /** - * This method sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold at any give time. + * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are + * blocked. * - * @param maxSize Max size for the cache. + * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept + * before blocking. */ - void setMaxSize(int maxSize); + void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput); /** - * This method sets the max byte size for the GetRecordsCache. This is the sum of all the records bytes present in - * the cache at a given point of time. + * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache + * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. * - * @param maxByteSize Maximum byte size for the cache. + * @param maxByteSize The maximum byte size for the cache before blocking. */ void setMaxByteSize(int maxByteSize); /** - * This method sets the max number of records for the GetRecordsCache. This is the sum of all the records present - * across all the ProcessRecordsInput in the cache at a given point of time. + * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records + * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point + * of time. * - * @param maxRecordsCount Maximum number of records in the cache. + * @param maxRecordsCount The mximum number of records in the cache before blocking. */ void setMaxRecordsCount(int maxRecordsCount); /** - * This method sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. * * @param dataFetchingStrategy Fetching strategy to be used */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); /** - * This method sets the maximum idle time between two get calls. + * Sets the maximum idle time between two get calls. * * @param idleMillisBetweenCalls Sleep millis between calls. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index c9f10492..4d44f9ea 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -23,7 +23,7 @@ import lombok.extern.apachecommons.CommonsLog; @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private final int maxRecords; - private int maxSize = 3; + private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; @@ -38,7 +38,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { - return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) @@ -49,8 +49,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public void setMaxSize(int maxSize){ - this.maxSize = maxSize; + public void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput){ + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; } @Override