From 80216b9a595a8b1fccc53e8c507abef0a0447b83 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 3 Oct 2017 22:57:33 -0700 Subject: [PATCH] Added shardId to thread name. Added documentation to the methods. --- .../worker/KinesisClientLibConfiguration.java | 15 +++++++- .../lib/worker/PrefetchGetRecordsCache.java | 11 ++++++ .../lib/worker/RecordsFetcherFactory.java | 36 +++++++++++++++++-- .../lib/worker/ShardConsumer.java | 3 +- .../worker/SimpleRecordsFetcherFactory.java | 4 +-- .../lib/worker/RecordsFetcherFactoryTest.java | 20 +++++------ .../lib/worker/ShardConsumerTest.java | 4 +-- .../clientlibrary/lib/worker/WorkerTest.java | 3 +- 8 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9954a3c1..9e838ed0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1298,17 +1298,30 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param maxCacheByteSize Max byte size for the cache at any given point of time. After this threshold is crossed + * the KinesisDataFetcher will be blocked until the cache has more space available. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); return this; } + /** + * @param dataFetchingStrategy The strategy for fetching data from kinesis. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); return this; } + /** + * @param maxRecordsCount The maximum number of records in the cache, accross all ProcessRecordInput objects + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { checkIsValuePositive("maxRecordsCount", maxRecordsCount); this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); @@ -1334,7 +1347,7 @@ public class KinesisClientLibConfiguration { /** * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index cf76442e..b3d4b6f6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -50,6 +50,17 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private boolean started = false; + /** + * Constructor + * + * @param maxSize Max size of the queue in the cache + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 98073e5b..5e9b9dda 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -24,18 +24,48 @@ public interface RecordsFetcherFactory { /** * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. * - * @return Returns a record fetcher object + * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache + * @param shardId ShardId of the shard for which the GetRecordsCache is to be returned + * + * @return Returns a GetRecordsCache object */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); + /** + * This method sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold at any give time. + * + * @param maxSize Max size for the cache. + */ void setMaxSize(int maxSize); + /** + * This method sets the max byte size for the GetRecordsCache. This is the sum of all the records bytes present in + * the cache at a given point of time. + * + * @param maxByteSize Maximum byte size for the cache. + */ void setMaxByteSize(int maxByteSize); + /** + * This method sets the max number of records for the GetRecordsCache. This is the sum of all the records present + * across all the ProcessRecordsInput in the cache at a given point of time. + * + * @param maxRecordsCount Maximum number of records in the cache. + */ void setMaxRecordsCount(int maxRecordsCount); + /** + * This method sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * + * @param dataFetchingStrategy Fetching strategy to be used + */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); - + + /** + * This method sets the maximum idle time between two get calls. + * + * @param idleMillisBetweenCalls Sleep millis between calls. + */ void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index e6912335..2489b452 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -234,7 +234,8 @@ class ShardConsumer { this.config = config; this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( - makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo)); + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId()); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index cde0f53f..0a577ada 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -34,7 +34,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { @@ -42,7 +42,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("prefetch-get-records-cache-%d") + .setNameFormat("prefetch-get-records-cache-" + shardId + "-%d") .build()), idleMillisBetweenCalls); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 17a77123..2002b37f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,19 +1,15 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -public class RecordsFetcherFactoryTest { +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +public class RecordsFetcherFactoryTest { + private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; @Mock @@ -27,14 +23,14 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index d5a68666..421c8c90 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 6190e3c9..aab1067f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -620,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList,