diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index a885c4d9..dac77351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -178,23 +179,34 @@ class PeriodicShardSyncManager { final long runStartMillis = System.currentTimeMillis(); try { + // Create a copy of the streams to be considered for this run to avoid data race with Scheduler. + final Set streamConfigMap = new HashSet<>(currentStreamConfigMap.keySet()); + // Construct the stream to leases map to be used in the lease sync - final Map> streamToLeasesMap = getStreamToLeasesMap( - currentStreamConfigMap.keySet()); + final Map> streamToLeasesMap = getStreamToLeasesMap(streamConfigMap); // For each of the stream, check if shard sync needs to be done based on the leases state. - for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { - final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), - streamToLeasesMap.get(streamConfigEntry.getKey())); + for (StreamIdentifier streamIdentifier : streamConfigMap) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier); + continue; + } + final ShardSyncResponse shardSyncResponse = checkForShardSync(streamIdentifier, + streamToLeasesMap.get(streamIdentifier)); numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0; numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0; if (shardSyncResponse.shouldDoShardSync()) { log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", - streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); + streamIdentifier, shardSyncResponse.reasonForDecision()); + final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); + if (streamConfig == null) { + log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier); + continue; + } final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider - .apply(streamConfigEntry.getValue()); + .apply(streamConfig); if (!shardSyncTaskManager.submitShardSyncTask()) { log.warn( "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", @@ -205,7 +217,7 @@ class PeriodicShardSyncManager { shardSyncResponse.reasonForDecision()); } } else { - log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(), + log.info("Skipping shard sync for {} due to the reason - {}", streamIdentifier, shardSyncResponse.reasonForDecision()); } }