Minimize race in PSSM to avoid unnecessary shard sync calls (#1088)

This commit is contained in:
chenylee-aws 2023-04-12 10:33:50 -07:00 committed by GitHub
parent 7cd7c27a80
commit 7b23ae9b3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -178,23 +179,34 @@ class PeriodicShardSyncManager {
final long runStartMillis = System.currentTimeMillis();
try {
// Create a copy of the streams to be considered for this run to avoid data race with Scheduler.
final Set<StreamIdentifier> streamConfigMap = new HashSet<>(currentStreamConfigMap.keySet());
// Construct the stream to leases map to be used in the lease sync
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
currentStreamConfigMap.keySet());
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(streamConfigMap);
// For each of the stream, check if shard sync needs to be done based on the leases state.
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()));
for (StreamIdentifier streamIdentifier : streamConfigMap) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier);
continue;
}
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamIdentifier,
streamToLeasesMap.get(streamIdentifier));
numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0;
numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0;
if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
streamIdentifier, shardSyncResponse.reasonForDecision());
final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) {
log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier);
continue;
}
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfigEntry.getValue());
.apply(streamConfig);
if (!shardSyncTaskManager.submitShardSyncTask()) {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
@ -205,7 +217,7 @@ class PeriodicShardSyncManager {
shardSyncResponse.reasonForDecision());
}
} else {
log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(),
log.info("Skipping shard sync for {} due to the reason - {}", streamIdentifier,
shardSyncResponse.reasonForDecision());
}
}