Copy streamConfigMap to avoid data race with Scheduler
This commit is contained in:
parent
88246e717e
commit
4cc1754e16
1 changed files with 10 additions and 2 deletions
|
|
@ -178,12 +178,20 @@ class PeriodicShardSyncManager {
|
||||||
final long runStartMillis = System.currentTimeMillis();
|
final long runStartMillis = System.currentTimeMillis();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Create a shallow copy of the map to avoid data race with Scheduler.
|
||||||
|
final Map<StreamIdentifier, StreamConfig> streamConfigMap = currentStreamConfigMap.entrySet().stream()
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
|
||||||
// Construct the stream to leases map to be used in the lease sync
|
// Construct the stream to leases map to be used in the lease sync
|
||||||
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
|
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
|
||||||
currentStreamConfigMap.keySet());
|
streamConfigMap.keySet());
|
||||||
|
|
||||||
// For each of the stream, check if shard sync needs to be done based on the leases state.
|
// For each of the stream, check if shard sync needs to be done based on the leases state.
|
||||||
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
|
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : streamConfigMap.entrySet()) {
|
||||||
|
// Skip shardSync if CSCM has purged the stream.
|
||||||
|
if (!currentStreamConfigMap.containsKey(streamConfigEntry.getKey())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
|
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
|
||||||
streamToLeasesMap.get(streamConfigEntry.getKey()));
|
streamToLeasesMap.get(streamConfigEntry.getKey()));
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue