diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index b3cfdb56..4f677524 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -97,6 +97,7 @@ public class HierarchicalShardSyncer { /** * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * (e.g. at startup, or when we reach end of a shard). + * Return true, if shard sync was performed. Return false, if shard sync is skipped. * * @param shardDetector * @param leaseRefresher @@ -109,18 +110,18 @@ public class HierarchicalShardSyncer { * @throws KinesisClientLibIOException */ // CHECKSTYLE:OFF CyclomaticComplexity - public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, + public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, + return checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API - public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, + public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { @@ -131,7 +132,7 @@ public class HierarchicalShardSyncer { log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } else { log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier); - return; + return false; } final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); @@ -161,6 +162,7 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); + return true; } /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 820d4528..dd576114 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -67,11 +67,11 @@ public class ShardSyncTask implements ConsumerTask { boolean shardSyncSuccess = true; try { - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + boolean didPerformShardSync = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty()); - if (shardSyncTaskIdleTimeMillis > 0) { + if (didPerformShardSync && shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } } catch (Exception e) {