diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index b3ba8a7d..bd3b9721 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -45,6 +45,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; @KinesisClientInternalApi public class ProcessTask implements ConsumerTask { private static final String PROCESS_TASK_OPERATION = "ProcessTask"; + private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; @@ -112,20 +113,23 @@ public class ProcessTask implements ConsumerTask { */ @Override public TaskResult call() { - final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(scope, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(scope_shard, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); + scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -142,11 +146,11 @@ public class ProcessTask implements ConsumerTask { } if (!records.isEmpty()) { - scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - scope, records, recordProcessorCheckpointer.lastCheckpointValue(), + scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -165,8 +169,9 @@ public class ProcessTask implements ConsumerTask { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(scope); + MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(scope_shard); + MetricsUtil.endScope(scope_app); } }