diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 5de09c37..1d02f47c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -109,7 +109,8 @@ class PeriodicShardSyncManager { for (Map.Entry mapEntry : streamToShardSyncTaskManagerMap.entrySet()) { final ShardSyncTaskManager shardSyncTaskManager = mapEntry.getValue(); if (!shardSyncTaskManager.syncShardAndLeaseInfo()) { - throw new KinesisClientLibIOException("Failed to submit shard sync task for stream " + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); + log.warn("Failed to submit shard sync task for stream {}. This could be due to the previous shard sync task not finished.", + shardSyncTaskManager.shardDetector().streamIdentifier().streamName()); } } } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c59608b2..b8f581c6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -23,6 +23,7 @@ import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskType; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.MetricsLevel; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; @@ -62,6 +63,7 @@ public class ShardSyncTask implements ConsumerTask { public TaskResult call() { Exception exception = null; final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); + boolean shardSyncSuccess = true; try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, @@ -72,7 +74,9 @@ public class ShardSyncTask implements ConsumerTask { } catch (Exception e) { log.error("Caught exception while sync'ing Kinesis shards and leases", e); exception = e; + shardSyncSuccess = false; } finally { + MetricsUtil.addSuccess(scope, "SyncShards", shardSyncSuccess, MetricsLevel.DETAILED); MetricsUtil.endScope(scope); }