From dddcd64bd778dc2114556471607ccf74281e58f6 Mon Sep 17 00:00:00 2001 From: Wei Date: Fri, 6 Oct 2017 10:51:10 -0700 Subject: [PATCH] Passed the operation by caller --- .../lib/worker/BlockingGetRecordsCache.java | 5 +++++ .../clientlibrary/lib/worker/GetRecordsCache.java | 3 +++ .../lib/worker/PrefetchGetRecordsCache.java | 9 ++++++++- .../kinesis/clientlibrary/lib/worker/ProcessTask.java | 3 ++- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index d9fc011e..84ddadb1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -49,6 +49,11 @@ public class BlockingGetRecordsCache implements GetRecordsCache { // } + @Override + public void setMetricOperation(String operation){ + + } + @Override public ProcessRecordsInput getNextResult() { sleepBeforeNextCall(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index dba24f8d..e85a9f8c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; /** * This class is used as a cache for Prefetching data from Kinesis. @@ -40,4 +41,6 @@ public interface GetRecordsCache { * This method calls the shutdown behavior on the cache, if available. */ void shutdown(); + + void setMetricOperation(String operation); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 710b4b5c..235ee885 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; @@ -56,6 +57,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private boolean started = false; + private String operation; + /** * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a * LinkedBlockingQueue. @@ -127,6 +130,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { return getRecordsRetrievalStrategy; } + @Override + public void setMetricOperation(String operation) { + this.operation = operation; + } @Override public void shutdown() { defaultGetRecordsCacheDaemon.isShutdown = true; @@ -144,7 +151,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { log.warn("Prefetch thread was interrupted."); break; } - MetricsHelper.startScope(metricsFactory, "ProcessTask"); + MetricsHelper.startScope(metricsFactory, operation); if (prefetchCounters.shouldGetNewRecords()) { try { sleepBeforeNextCall(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 20be71b4..aac36350 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -148,7 +148,8 @@ class ProcessTask implements ITask { scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); - + getRecordsCache.setMetricOperation(this.getClass().getSimpleName()); + System.out.println(this.getClass().getSimpleName()); Exception exception = null; try {