Adding metric for ShardSyncTaskManager

This commit is contained in:
Chunxue Yang 2020-03-20 11:54:40 -07:00
parent 384fe5266c
commit bb495e4f60
2 changed files with 6 additions and 1 deletions

View file

@ -109,7 +109,8 @@ class PeriodicShardSyncManager {
for (Map.Entry<StreamIdentifier, ShardSyncTaskManager> mapEntry : streamToShardSyncTaskManagerMap.entrySet()) {
final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue();
if (!shardSyncTaskManager.syncShardAndLeaseInfo()) {
throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
}
}
} else {

View file

@ -23,6 +23,7 @@ import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.lifecycle.TaskType;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
@ -62,6 +63,7 @@ public class ShardSyncTask implements ConsumerTask {
public TaskResult call() {
Exception exception = null;
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
boolean shardSyncSuccess = true;
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
@ -72,7 +74,9 @@ public class ShardSyncTask implements ConsumerTask {
} catch (Exception e) {
log.error("Caught exception while sync'ing Kinesis shards and leases", e);
exception = e;
shardSyncSuccess = false;
} finally {
MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED);
MetricsUtil.endScope(scope);
}