diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a885c4d9..8cad8e31 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -178,12 +178,20 @@ class PeriodicShardSyncManager { final long runStartMillis = System.currentTimeMillis(); try { + // Create a shallow copy of the map to avoid data race with Scheduler. + final Map streamConfigMap = currentStreamConfigMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + // Construct the stream to leases map to be used in the lease sync final Map> streamToLeasesMap = getStreamToLeasesMap( - currentStreamConfigMap.keySet()); + streamConfigMap.keySet()); // For each of the stream, check if shard sync needs to be done based on the leases state. - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { + for (Map.Entry streamConfigEntry : streamConfigMap.entrySet()) { + // Skip shardSync if CSCM has purged the stream. + if (!currentStreamConfigMap.containsKey(streamConfigEntry.getKey())) { + continue; + } final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), streamToLeasesMap.get(streamConfigEntry.getKey()));