Passed the operation by caller

This commit is contained in:
Wei 2017-10-06 10:51:10 -07:00
parent 81c13d2a35
commit dddcd64bd7
4 changed files with 18 additions and 2 deletions

View file

@ -49,6 +49,11 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
// //
} }
@Override
public void setMetricOperation(String operation){
}
@Override @Override
public ProcessRecordsInput getNextResult() { public ProcessRecordsInput getNextResult() {
sleepBeforeNextCall(); sleepBeforeNextCall();

View file

@ -16,6 +16,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
/** /**
* This class is used as a cache for Prefetching data from Kinesis. * This class is used as a cache for Prefetching data from Kinesis.
@ -40,4 +41,6 @@ public interface GetRecordsCache {
* This method calls the shutdown behavior on the cache, if available. * This method calls the shutdown behavior on the cache, if available.
*/ */
void shutdown(); void shutdown();
void setMetricOperation(String operation);
} }

View file

@ -25,6 +25,7 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
import lombok.NonNull; import lombok.NonNull;
@ -56,6 +57,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private boolean started = false; private boolean started = false;
private String operation;
/** /**
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue. * LinkedBlockingQueue.
@ -127,6 +130,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
return getRecordsRetrievalStrategy; return getRecordsRetrievalStrategy;
} }
@Override
public void setMetricOperation(String operation) {
this.operation = operation;
}
@Override @Override
public void shutdown() { public void shutdown() {
defaultGetRecordsCacheDaemon.isShutdown = true; defaultGetRecordsCacheDaemon.isShutdown = true;
@ -144,7 +151,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
log.warn("Prefetch thread was interrupted."); log.warn("Prefetch thread was interrupted.");
break; break;
} }
MetricsHelper.startScope(metricsFactory, "ProcessTask"); MetricsHelper.startScope(metricsFactory, operation);
if (prefetchCounters.shouldGetNewRecords()) { if (prefetchCounters.shouldGetNewRecords()) {
try { try {
sleepBeforeNextCall(); sleepBeforeNextCall();

View file

@ -148,7 +148,8 @@ class ProcessTask implements ITask {
scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId());
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY);
getRecordsCache.setMetricOperation(this.getClass().getSimpleName());
System.out.println(this.getClass().getSimpleName());
Exception exception = null; Exception exception = null;
try { try {