Avoiding ShardSync Task sleep when we skip the shard sync due to no shards found

This commit is contained in:
Ashwin Giridharan 2020-07-22 10:36:22 -07:00
parent ff703459e1
commit 8aec062e64
2 changed files with 8 additions and 6 deletions

View file

@ -97,6 +97,7 @@ public class HierarchicalShardSyncer {
/** /**
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard). * (e.g. at startup, or when we reach end of a shard).
* Return true, if shard sync was performed. Return false, if shard sync is skipped.
* *
* @param shardDetector * @param shardDetector
* @param leaseRefresher * @param leaseRefresher
@ -109,18 +110,18 @@ public class HierarchicalShardSyncer {
* @throws KinesisClientLibIOException * @throws KinesisClientLibIOException
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty) final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException {
final List<Shard> latestShards = isLeaseTableEmpty ? final List<Shard> latestShards = isLeaseTableEmpty ?
getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, return checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope,
isLeaseTableEmpty); isLeaseTableEmpty);
} }
//Provide a pre-collcted list of shards to avoid calling ListShards API //Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) List<Shard> latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
@ -131,7 +132,7 @@ public class HierarchicalShardSyncer {
log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size());
} else { } else {
log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier); log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier);
return; return false;
} }
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards); final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(latestShards);
@ -161,6 +162,7 @@ public class HierarchicalShardSyncer {
} }
final List<Lease> trackedLeases = new ArrayList<>(currentLeases); final List<Lease> trackedLeases = new ArrayList<>(currentLeases);
trackedLeases.addAll(newLeasesToCreate); trackedLeases.addAll(newLeasesToCreate);
return true;
} }
/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls

View file

@ -67,11 +67,11 @@ public class ShardSyncTask implements ConsumerTask {
boolean shardSyncSuccess = true; boolean shardSyncSuccess = true;
try { try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, boolean didPerformShardSync = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher,
initialPosition, scope, ignoreUnexpectedChildShards, initialPosition, scope, ignoreUnexpectedChildShards,
leaseRefresher.isLeaseTableEmpty()); leaseRefresher.isLeaseTableEmpty());
if (shardSyncTaskIdleTimeMillis > 0) { if (didPerformShardSync && shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis); Thread.sleep(shardSyncTaskIdleTimeMillis);
} }
} catch (Exception e) { } catch (Exception e) {