Set the Metrics Scope for Cross Thread Operations

Setup the metrics scope to a cross thread version, and install the
thread local.  This gets rid of a warning for the calls to dataFetcher.

cr https://code.amazon.com/reviews/CR-638954
This commit is contained in:
Pfifer, Justin 2017-09-13 16:20:50 -07:00
parent 61be5baeb0
commit aba87b6669
2 changed files with 19 additions and 3 deletions

View file

@ -2,6 +2,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@ -12,6 +13,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -59,9 +62,10 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
GetRecordsResult result = null;
Set<Future<GetRecordsResult>> futures = new HashSet<>();
Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
try {
futures.add(completionService.submit(() -> dataFetcher.getRecords(maxRecords)));
futures.add(completionService.submit(retrieverCall));
} catch (RejectedExecutionException e) {
log.warn("Out of resources, unable to start additional requests.");
}
@ -90,6 +94,18 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
return result;
}
private Callable<GetRecordsResult> createRetrieverCallable(int maxRecords) {
ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
return () -> {
try {
MetricsHelper.setMetricsScope(metricsScope);
return dataFetcher.getRecords(maxRecords);
} finally {
MetricsHelper.unsetMetricsScope();
}
};
}
@Override
public void shutdown() {
executorService.shutdownNow();

View file

@ -168,7 +168,7 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
@ -199,7 +199,7 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));