Merge pull request #868 from QAQJ/consumer-level-metrics
Adding a new metric: Application-level MillisBehindLatest
This commit is contained in:
commit
bedae95db9
1 changed files with 20 additions and 10 deletions
|
|
@ -45,6 +45,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
|
|||
@KinesisClientInternalApi
|
||||
public class ProcessTask implements ConsumerTask {
|
||||
private static final String PROCESS_TASK_OPERATION = "ProcessTask";
|
||||
private static final String APPLICATION_TRACKER_OPERATION = "ApplicationTracker";
|
||||
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
|
||||
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
|
||||
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
|
||||
|
|
@ -112,20 +113,28 @@ public class ProcessTask implements ConsumerTask {
|
|||
*/
|
||||
@Override
|
||||
public TaskResult call() {
|
||||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
||||
/**
|
||||
* NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
|
||||
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
|
||||
* which is the app-level MillsBehindLatest metric.
|
||||
*/
|
||||
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
|
||||
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
|
||||
shardInfo.streamIdentifierSerOpt()
|
||||
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
|
||||
MetricsUtil.addShardId(scope, shardInfo.shardId());
|
||||
.ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
|
||||
MetricsUtil.addShardId(shardScope, shardInfo.shardId());
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
boolean success = false;
|
||||
try {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
|
||||
shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
|
||||
Exception exception = null;
|
||||
|
||||
try {
|
||||
if (processRecordsInput.millisBehindLatest() != null) {
|
||||
scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
|
||||
shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
|
||||
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
|
||||
appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
|
||||
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
|
|
@ -142,11 +151,11 @@ public class ProcessTask implements ConsumerTask {
|
|||
}
|
||||
|
||||
if (!records.isEmpty()) {
|
||||
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
|
||||
}
|
||||
|
||||
recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
|
||||
scope, records, recordProcessorCheckpointer.lastCheckpointValue(),
|
||||
shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(),
|
||||
recordProcessorCheckpointer.largestPermittedCheckpointValue()));
|
||||
|
||||
if (shouldCallProcessRecords(records)) {
|
||||
|
|
@ -165,8 +174,9 @@ public class ProcessTask implements ConsumerTask {
|
|||
}
|
||||
return new TaskResult(exception);
|
||||
} finally {
|
||||
MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY);
|
||||
MetricsUtil.endScope(scope);
|
||||
MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY);
|
||||
MetricsUtil.endScope(shardScope);
|
||||
MetricsUtil.endScope(appScope);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue