first commit for app-level mills_behind_latest metric

This commit is contained in:
Qilin Jin 2021-11-18 10:48:23 -08:00
parent 7503ec7105
commit b43c14476b

View file

@ -45,6 +45,7 @@ import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder;
@KinesisClientInternalApi @KinesisClientInternalApi
public class ProcessTask implements ConsumerTask { public class ProcessTask implements ConsumerTask {
private static final String PROCESS_TASK_OPERATION = "ProcessTask"; private static final String PROCESS_TASK_OPERATION = "ProcessTask";
private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
@ -112,20 +113,23 @@ public class ProcessTask implements ConsumerTask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS);
final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt() shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(scope, shardInfo.shardId()); MetricsUtil.addShardId(scope_shard, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis(); long startTimeMillis = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
Exception exception = null; Exception exception = null;
try { try {
if (processRecordsInput.millisBehindLatest() != null) { if (processRecordsInput.millisBehindLatest() != null) {
scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
} }
@ -142,11 +146,11 @@ public class ProcessTask implements ConsumerTask {
} }
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
} }
recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
scope, records, recordProcessorCheckpointer.lastCheckpointValue(), scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(),
recordProcessorCheckpointer.largestPermittedCheckpointValue())); recordProcessorCheckpointer.largestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
@ -165,8 +169,9 @@ public class ProcessTask implements ConsumerTask {
} }
return new TaskResult(exception); return new TaskResult(exception);
} finally { } finally {
MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope); MetricsUtil.endScope(scope_shard);
MetricsUtil.endScope(scope_app);
} }
} }