diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index e1678f45..5369c0f4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -52,12 +52,9 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; - private PrefetchCounters prefetchCounters; - private boolean started = false; - - private String operation; + private final String operation; /** * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index e081d7ac..be8316d7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -26,11 +26,10 @@ public interface RecordsFetcherFactory { * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache * @param shardId ShardId of the shard that the fetcher will retrieve records for * @param metricsFactory MetricsFactory used to create metricScope - * @param operation operation name used to emit metrics * * @return GetRecordsCache used to get records from Kinesis. */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory, String operation); + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory); /** * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 6cf2a544..d7a5545e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -235,7 +235,7 @@ class ShardConsumer { this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), - this.getShardInfo().getShardId(), metricsFactory, "ProcessTask"); + this.getShardInfo().getShardId(), this.metricsFactory); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 865e4dc4..4bef71a1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -35,7 +35,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory, String operation) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { @@ -45,7 +45,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { .setDaemon(true) .setNameFormat("prefetch-cache-" + shardId + "-%04d") .build()), - idleMillisBetweenCalls, metricsFactory, operation); + idleMillisBetweenCalls, metricsFactory, "processTask"); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 546b1b00..912804da 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -13,7 +13,6 @@ import org.mockito.MockitoAnnotations; public class RecordsFetcherFactoryTest { private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; - private String operation = "ProcessTask"; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -30,7 +29,7 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, - metricsFactory, operation); + metricsFactory); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @@ -38,7 +37,7 @@ public class RecordsFetcherFactoryTest { public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, - metricsFactory, operation); + metricsFactory); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index b4bb32d7..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(),anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(),anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 2b529404..a8856a0b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -621,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any(), anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList,