From 4cc1754e16a8fe91a71a3440f014e34aebebfda8 Mon Sep 17 00:00:00 2001 From: Chenyuan Lee Date: Sun, 9 Apr 2023 23:32:37 -0700 Subject: [PATCH] Copy streamConfigMap to avoid data race with Scheduler --- .../coordinator/PeriodicShardSyncManager.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a885c4d9..8cad8e31 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -178,12 +178,20 @@ class PeriodicShardSyncManager { final long runStartMillis = System.currentTimeMillis(); try { + // Create a shallow copy of the map to avoid data race with Scheduler. + final Map streamConfigMap = currentStreamConfigMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + // Construct the stream to leases map to be used in the lease sync final Map> streamToLeasesMap = getStreamToLeasesMap( - currentStreamConfigMap.keySet()); + streamConfigMap.keySet()); // For each of the stream, check if shard sync needs to be done based on the leases state. - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + for (Map.Entry streamConfigEntry : streamConfigMap.entrySet()) { + // Skip shardSync if CSCM has purged the stream. + if (!currentStreamConfigMap.containsKey(streamConfigEntry.getKey())) { + continue; + } final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), streamToLeasesMap.get(streamConfigEntry.getKey()));