diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 9954a3c1..b0ac6bfd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1289,26 +1289,40 @@ public class KinesisClientLibConfiguration { /** * - * @param maxCacheSize the max number of records stored in the getRecordsCache + * @param maxPendingProcessRecordsInput The max number of ProcessRecordsInput that can be stored in the cache before + * blocking * @return this configuration object */ - public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { - checkIsValuePositive("maxCacheSize", maxCacheSize); - this.recordsFetcherFactory.setMaxSize(maxCacheSize); + public KinesisClientLibConfiguration withMaxPendingProcessRecordsInput(final int maxPendingProcessRecordsInput) { + checkIsValuePositive("maxPendingProcessRecordsInput", maxPendingProcessRecordsInput); + this.recordsFetcherFactory.setMaxPendingProcessRecordsInput(maxPendingProcessRecordsInput); return this; } + /** + * @param maxCacheByteSize Max byte size for the cache at any given point of time. After this threshold is crossed + * the KinesisDataFetcher will be blocked until the cache has more space available. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); return this; } + /** + * @param dataFetchingStrategy The strategy for fetching data from kinesis. + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); return this; } + /** + * @param maxRecordsCount The maximum number of records in the cache, accross all ProcessRecordInput objects + * @return KinesisClientLibConfiguration + */ public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { checkIsValuePositive("maxRecordsCount", maxRecordsCount); this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); @@ -1334,7 +1348,7 @@ public class KinesisClientLibConfiguration { /** * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. - * @return + * @return KinesisClientLibConfiguration */ public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 826a568f..710b4b5c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; @@ -30,17 +31,17 @@ import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; /** - * This is the default caching class, this class spins up a thread if prefetching is enabled. That thread fetches the - * next set of records and stores it in the cache. The size of the cache is limited by setting maxSize i.e. the maximum - * number of GetRecordsResult that the cache can store, maxByteSize i.e. the byte size of the records stored in the - * cache and maxRecordsCount i.e. the max number of records that should be present in the cache across multiple - * GetRecordsResult object. If no data is available in the cache, the call from the record processor is blocked till - * records are retrieved from Kinesis. + * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting + * maxPendingProcessRecordsInput i.e. the maximum number of GetRecordsResult that the cache can store, maxByteSize + * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should + * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from + * the record processor is blocked till records are retrieved from Kinesis. */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { LinkedBlockingQueue getRecordsResultQueue; - private int maxSize; + private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; private final int maxRecordsPerCall; @@ -49,27 +50,44 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final IMetricsFactory metricsFactory; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; + private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private PrefetchCounters prefetchCounters; private boolean started = false; - public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, + /** + * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, - @NonNull final IMetricsFactory metricsFactory, - long idleMillisBetweenCalls) { + long idleMillisBetweenCalls, + @NonNull final IMetricsFactory metricsFactory) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; - this.maxSize = maxSize; + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); } @Override @@ -80,7 +98,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { if (!started) { log.info("Starting prefetching thread."); - executorService.execute(new DefaultGetRecordsCacheDaemon()); + executorService.execute(defaultGetRecordsCacheDaemon); } started = true; } @@ -111,17 +129,19 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { + defaultGetRecordsCacheDaemon.isShutdown = true; executorService.shutdownNow(); started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { + volatile boolean isShutdown = false; + @Override public void run() { - while (true) { + while (!isShutdown) { if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); - callShutdownOnStrategy(); break; } MetricsHelper.startScope(metricsFactory, "ProcessTask"); @@ -137,15 +157,20 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache"); - callShutdownOnStrategy(); - } catch (Error e) { - log.error("Error was thrown while getting records, please check for the error", e); + log.info("Thread was interrupted, indicating shutdown was called on the cache."); + } catch (SdkClientException e) { + log.error("Exception thrown while fetching records from Kinesis", e); + } catch (Throwable e) { + log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", e); } finally { MetricsHelper.endScope(); } } } + callShutdownOnStrategy(); } private void callShutdownOnStrategy() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 744f0e05..c8124793 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -17,27 +17,57 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** - * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. - * Clients may choose to create separate instantiations, or re-use instantiations. + * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. */ - public interface RecordsFetcherFactory { + /** + * Returns a GetRecordsCache to be used for retrieving records for a given shard. + * + * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache + * @param shardId ShardId of the shard that the fetcher will retrieve records for + * + * @return GetRecordsCache used to get records from Kinesis. + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory); /** - * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. - * - * @return Returns a record fetcher object + * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are + * blocked. + * + * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept + * before blocking. */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, IMetricsFactory metricsFactory); - - void setMaxSize(int maxSize); + void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput); + /** + * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache + * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. + * + * @param maxByteSize The maximum byte size for the cache before blocking. + */ void setMaxByteSize(int maxByteSize); + /** + * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records + * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point + * of time. + * + * @param maxRecordsCount The mximum number of records in the cache before blocking. + */ void setMaxRecordsCount(int maxRecordsCount); + /** + * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * + * @param dataFetchingStrategy Fetching strategy to be used + */ void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); - + + /** + * Sets the maximum idle time between two get calls. + * + * @param idleMillisBetweenCalls Sleep millis between calls. + */ void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 182235cc..7610f89c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -234,7 +234,8 @@ class ShardConsumer { this.config = config; this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( - makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), metricsFactory); + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId(), metricsFactory); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index fbf99822..d5ce00c1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -17,12 +17,13 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.apachecommons.CommonsLog; @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private final int maxRecords; - private int maxSize = 3; + private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; @@ -34,18 +35,23 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, IMetricsFactory metricsFactory) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { - return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), metricsFactory, idleMillisBetweenCalls); + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, + Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("prefetch-cache-" + shardId + "-%04d") + .build()), + idleMillisBetweenCalls, metricsFactory); } } @Override - public void setMaxSize(int maxSize){ - this.maxSize = maxSize; + public void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput){ + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; } @Override diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index 7f8face8..e56698d1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -86,8 +86,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, executorService, - new NullMetricsFactory(), - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory()); } @Test @@ -131,8 +131,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy2, executorService2, - new NullMetricsFactory(), - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory()); getRecordsCache.start(); sleep(IDLE_MILLIS_BETWEEN_CALLS); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index eebc56e8..d549bcc4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -82,8 +82,8 @@ public class PrefetchGetRecordsCacheTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, executorService, - new NullMetricsFactory(), - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory()); spyQueue = spy(getRecordsCache.getRecordsResultQueue); records = spy(new ArrayList<>()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index a4fde866..912804da 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,21 +1,17 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -public class RecordsFetcherFactoryTest { +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +public class RecordsFetcherFactoryTest { + private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; @Mock @@ -32,14 +28,16 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, metricsFactory); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, metricsFactory); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index f1437b6e..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index cd396f0a..a8856a0b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -620,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any(), any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList,