add some comments to explain the metric scopes

This commit is contained in:
Qilin Jin 2021-12-01 11:58:19 -08:00
parent 66e5dfecb5
commit a2e0269826

View file

@ -113,23 +113,28 @@ public class ProcessTask implements ConsumerTask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
final MetricsScope AppScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); /**
final MetricsScope ShardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
* which is the app-level MillsBehindLatest metric.
*/
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS);
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt() shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(ShardScope, StreamIdentifier.multiStreamInstance(streamId))); .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(ShardScope, shardInfo.shardId()); MetricsUtil.addShardId(shardScope, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis(); long startTimeMillis = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
ShardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
ShardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
Exception exception = null; Exception exception = null;
try { try {
if (processRecordsInput.millisBehindLatest() != null) { if (processRecordsInput.millisBehindLatest() != null) {
ShardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
AppScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
} }
@ -146,11 +151,11 @@ public class ProcessTask implements ConsumerTask {
} }
if (!records.isEmpty()) { if (!records.isEmpty()) {
ShardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
} }
recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
ShardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(),
recordProcessorCheckpointer.largestPermittedCheckpointValue())); recordProcessorCheckpointer.largestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
@ -169,9 +174,9 @@ public class ProcessTask implements ConsumerTask {
} }
return new TaskResult(exception); return new TaskResult(exception);
} finally { } finally {
MetricsUtil.addSuccessAndLatency(ShardScope, success, startTimeMillis, MetricsLevel.SUMMARY); MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(ShardScope); MetricsUtil.endScope(shardScope);
MetricsUtil.endScope(AppScope); MetricsUtil.endScope(appScope);
} }
} }