Optimize check

This commit is contained in:
Chenyuan Lee 2023-04-11 13:55:33 -07:00
parent 4cc1754e16
commit c8ca3958e8

View file

@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -178,31 +179,34 @@ class PeriodicShardSyncManager {
final long runStartMillis = System.currentTimeMillis();
try {
// Create a shallow copy of the map to avoid data race with Scheduler.
final Map<StreamIdentifier, StreamConfig> streamConfigMap = currentStreamConfigMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Create a copy of the streams to be considered for this run to avoid data race with Scheduler.
final Set<StreamIdentifier> streamConfigMap = new HashSet<>(currentStreamConfigMap.keySet());
// Construct the stream to leases map to be used in the lease sync
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
streamConfigMap.keySet());
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(streamConfigMap);
// For each of the stream, check if shard sync needs to be done based on the leases state.
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : streamConfigMap.entrySet()) {
// Skip shardSync if CSCM has purged the stream.
if (!currentStreamConfigMap.containsKey(streamConfigEntry.getKey())) {
for (StreamIdentifier streamIdentifier : streamConfigMap) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier);
continue;
}
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()));
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamIdentifier,
streamToLeasesMap.get(streamIdentifier));
numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0;
numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0;
if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
streamIdentifier, shardSyncResponse.reasonForDecision());
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) {
log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier);
continue;
}
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue());
.apply(streamConfig);
if (!shardSyncTaskManager.submitShardSyncTask()) {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
@ -213,7 +217,7 @@ class PeriodicShardSyncManager {
shardSyncResponse.reasonForDecision());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
log.info("Skipping shard sync for {} due to the reason - {}", streamIdentifier,
shardSyncResponse.reasonForDecision());
}
}