diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index c849f7f2..bb5c0947 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -20,6 +20,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; @@ -42,6 +45,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; + private final IMetricsFactory metricsFactory; private PrefetchCounters prefetchCounters; @@ -50,7 +54,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, - @NonNull final ExecutorService executorService) { + @NonNull final ExecutorService executorService, + @NonNull final IMetricsFactory metricsFactory) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxSize = maxSize; @@ -59,6 +64,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; + this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); } @Override @@ -115,6 +121,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { } if (prefetchCounters.shouldGetNewRecords()) { try { + MetricsHelper.startScope(metricsFactory, "Prefetcheing"); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() .withRecords(getRecordsResult.getRecords()) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index cdd80e49..0c81e671 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + /** * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. * Clients may choose to create separate instantiations, or re-use instantiations. @@ -36,4 +38,6 @@ public interface RecordsFetcherFactory { void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + void setMetricsFactory(IMetricsFactory metricsFactory); + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 3a3958f3..14b33bc6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import lombok.extern.apachecommons.CommonsLog; @CommonsLog @@ -25,6 +26,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + private IMetricsFactory metricsFactory; public SimpleRecordsFetcherFactory(int maxRecords) { this.maxRecords = maxRecords; @@ -36,7 +38,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); } else { return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), metricsFactory); } } @@ -59,4 +61,9 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ this.dataFetchingStrategy = dataFetchingStrategy; } + + @Override + public void setMetricsFactory(IMetricsFactory metricsFactory) { + this.metricsFactory = metricsFactory; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index d2ea738d..f5295792 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -415,6 +415,7 @@ public class Worker implements Runnable { this.shardPrioritization = shardPrioritization; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + this.config.getRecordsFetcherFactory().setMetricsFactory(metricsFactory); } /** diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index 1c661663..fb0fa7bb 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.model.Record; import org.junit.After; import org.junit.Before; @@ -85,7 +86,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, - executorService); + executorService, + new NullMetricsFactory()); } @Test @@ -128,7 +130,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy2, - executorService2 + executorService2, + new NullMetricsFactory() ); getRecordsCache.start(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index 8517138f..241a23a7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.IntStream; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -79,7 +80,8 @@ public class PrefetchGetRecordsCacheTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, - executorService); + executorService, + new NullMetricsFactory()); spyQueue = spy(getRecordsCache.getRecordsResultQueue); records = spy(new ArrayList<>()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 17a77123..858d68f7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,5 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -23,6 +24,7 @@ public class RecordsFetcherFactoryTest { public void setUp() { MockitoAnnotations.initMocks(this); recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + recordsFetcherFactory.setMetricsFactory(new NullMetricsFactory()); } @Test