From aba87b666970c593661d8243dfb3fb6d89bb2cff Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 13 Sep 2017 16:20:50 -0700 Subject: [PATCH] Set the Metrics Scope for Cross Thread Operations Setup the metrics scope to a cross thread version, and install the thread local. This gets rid of a warning for the calls to dataFetcher. cr https://code.amazon.com/reviews/CR-638954 --- ...synchronousGetRecordsRetrievalStrategy.java | 18 +++++++++++++++++- .../lib/worker/ConsumerStatesTest.java | 4 ++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index db5ea042..6290dd4f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -2,6 +2,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -12,6 +13,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -59,9 +62,10 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie } GetRecordsResult result = null; Set> futures = new HashSet<>(); + Callable retrieverCall = createRetrieverCallable(maxRecords); while (true) { try { - futures.add(completionService.submit(() -> dataFetcher.getRecords(maxRecords))); + futures.add(completionService.submit(retrieverCall)); } catch (RejectedExecutionException e) { log.warn("Out of resources, unable to start additional requests."); } @@ -90,6 +94,18 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie return result; } + private Callable createRetrieverCallable(int maxRecords) { + ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope()); + return () -> { + try { + MetricsHelper.setMetricsScope(metricsScope); + return dataFetcher.getRecords(maxRecords); + } finally { + MetricsHelper.unsetMetricsScope(); + } + }; + } + @Override public void shutdown() { executorService.shutdownNow(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index d20d8c3d..307aa6b8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -168,7 +168,7 @@ public class ConsumerStatesTest { assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); - assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) )); + assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) )); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); @@ -199,7 +199,7 @@ public class ConsumerStatesTest { assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); - assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) )); + assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) )); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));