From 3b89b56df4ecf3cab5389636ac5c81cdf3472d1e Mon Sep 17 00:00:00 2001 From: BtXin Date: Mon, 9 Oct 2017 16:32:52 -0700 Subject: [PATCH] Handle custom metric scope (#235) * integrated prefetch with shardconsumer * fixed tests * added fatory methods * added tests and fixed broken tests * Resolved conflicts * Addressed comments * Integrated the changes * Handle Custom Metric Scope * emit metric * Addressed comments * Passed the operation by caller * Get rid of sysout * Added set metrics to InitializeTask * Addressed comments * Addressed Comments * Addressed comments * Addressed comment --- .../lib/worker/PrefetchGetRecordsCache.java | 18 +++++++++++++++--- .../clientlibrary/lib/worker/ProcessTask.java | 1 - .../lib/worker/RecordsFetcherFactory.java | 5 ++++- .../lib/worker/ShardConsumer.java | 2 +- .../worker/SimpleRecordsFetcherFactory.java | 10 ++++++---- ...PrefetchGetRecordsCacheIntegrationTest.java | 13 +++++++++++-- .../worker/PrefetchGetRecordsCacheTest.java | 6 +++++- .../lib/worker/RecordsFetcherFactoryTest.java | 11 +++++++++-- .../lib/worker/ShardConsumerTest.java | 4 ++-- .../clientlibrary/lib/worker/WorkerTest.java | 2 +- 10 files changed, 54 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 1a8ed508..5369c0f4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -22,10 +22,14 @@ import java.util.concurrent.LinkedBlockingQueue; import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; import lombok.extern.apachecommons.CommonsLog; +import org.apache.commons.lang.Validate; /** * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the @@ -44,13 +48,13 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; + private final IMetricsFactory metricsFactory; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; - private PrefetchCounters prefetchCounters; - private boolean started = false; + private final String operation; /** * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a @@ -71,7 +75,9 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, - long idleMillisBetweenCalls) { + long idleMillisBetweenCalls, + @NonNull final IMetricsFactory metricsFactory, + @NonNull String operation) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -80,8 +86,11 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; + this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); this.idleMillisBetweenCalls = idleMillisBetweenCalls; this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); + Validate.notEmpty(operation, "Operation cannot be empty"); + this.operation = operation; } @Override @@ -138,6 +147,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { log.warn("Prefetch thread was interrupted."); break; } + MetricsHelper.startScope(metricsFactory, operation); if (prefetchCounters.shouldGetNewRecords()) { try { sleepBeforeNextCall(); @@ -158,6 +168,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { " Please search for the exception/error online to check what is going on. If the " + "issue persists or is a recurring problem, feel free to open an issue on, " + "https://github.com/awslabs/amazon-kinesis-client.", e); + } finally { + MetricsHelper.endScope(); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 20be71b4..9aca832e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -148,7 +148,6 @@ class ProcessTask implements ITask { scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); - Exception exception = null; try { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index 0989995d..be8316d7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + /** * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. */ @@ -23,10 +25,11 @@ public interface RecordsFetcherFactory { * * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache * @param shardId ShardId of the shard that the fetcher will retrieve records for + * @param metricsFactory MetricsFactory used to create metricScope * * @return GetRecordsCache used to get records from Kinesis. */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId); + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory); /** * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 2489b452..d7a5545e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -235,7 +235,7 @@ class ShardConsumer { this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), - this.getShardInfo().getShardId()); + this.getShardInfo().getShardId(), this.metricsFactory); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 4d44f9ea..e6c9f3b0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -16,8 +16,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Executors; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; - import lombok.extern.apachecommons.CommonsLog; @CommonsLog @@ -28,13 +28,14 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + private IMetricsFactory metricsFactory; public SimpleRecordsFetcherFactory(int maxRecords) { this.maxRecords = maxRecords; } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { @@ -44,7 +45,9 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { .setDaemon(true) .setNameFormat("prefetch-cache-" + shardId + "-%04d") .build()), - idleMillisBetweenCalls); + idleMillisBetweenCalls, + metricsFactory, + "ProcessTask"); } } @@ -68,7 +71,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { this.dataFetchingStrategy = dataFetchingStrategy; } - @Override public void setIdleMillisBetweenCalls(final long idleMillisBetweenCalls) { this.idleMillisBetweenCalls = idleMillisBetweenCalls; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index 98ebcff2..37d0e446 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -31,6 +31,10 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.Record; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -62,6 +66,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { private KinesisDataFetcher dataFetcher; private ExecutorService executorService; private List records; + private String operation = "ProcessTask"; @Mock private IKinesisProxy proxy; @@ -82,7 +87,9 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, executorService, - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); } @Test @@ -126,7 +133,9 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy2, executorService2, - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); getRecordsCache.start(); sleep(IDLE_MILLIS_BETWEEN_CALLS); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index 91a27e7d..6091baa9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.IntStream; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -70,6 +71,7 @@ public class PrefetchGetRecordsCacheTest { private ExecutorService executorService; private LinkedBlockingQueue spyQueue; private PrefetchGetRecordsCache getRecordsCache; + private String operation = "ProcessTask"; @Before public void setup() { @@ -81,7 +83,9 @@ public class PrefetchGetRecordsCacheTest { MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, executorService, - IDLE_MILLIS_BETWEEN_CALLS); + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); spyQueue = spy(getRecordsCache.getRecordsResultQueue); records = spy(new ArrayList<>()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 2002b37f..912804da 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,5 +1,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -15,6 +17,9 @@ public class RecordsFetcherFactoryTest { @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private IMetricsFactory metricsFactory; + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -23,14 +28,16 @@ public class RecordsFetcherFactoryTest { @Test public void createDefaultRecordsFetcherTest() { - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 421c8c90..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -341,7 +341,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -471,7 +471,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher), 0L)); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index aab1067f..a8856a0b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -621,7 +621,7 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList,