diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f01acd89..cd460e3e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -113,23 +113,28 @@ public class ProcessTask implements ConsumerTask { */ @Override public TaskResult call() { - final MetricsScope AppScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); - final MetricsScope ShardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + /** + * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension, + * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric, + * which is the app-level MillsBehindLatest metric. + */ + final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(ShardScope, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(ShardScope, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(shardScope, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - ShardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - ShardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - ShardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); - AppScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -146,11 +151,11 @@ public class ProcessTask implements ConsumerTask { } if (!records.isEmpty()) { - ShardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - ShardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), + shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -169,9 +174,9 @@ public class ProcessTask implements ConsumerTask { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(ShardScope, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(ShardScope); - MetricsUtil.endScope(AppScope); + MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(shardScope); + MetricsUtil.endScope(appScope); } }