diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 8cad8e31..dac77351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -178,31 +179,34 @@ class PeriodicShardSyncManager { final long runStartMillis = System.currentTimeMillis(); try { - // Create a shallow copy of the map to avoid data race with Scheduler. - final Map streamConfigMap = currentStreamConfigMap.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + // Create a copy of the streams to be considered for this run to avoid data race with Scheduler. + final Set streamConfigMap = new HashSet<>(currentStreamConfigMap.keySet()); // Construct the stream to leases map to be used in the lease sync - final Map> streamToLeasesMap = getStreamToLeasesMap( - streamConfigMap.keySet()); + final Map> streamToLeasesMap = getStreamToLeasesMap(streamConfigMap); // For each of the stream, check if shard sync needs to be done based on the leases state. - for (Map.Entry streamConfigEntry : streamConfigMap.entrySet()) { - // Skip shardSync if CSCM has purged the stream. - if (!currentStreamConfigMap.containsKey(streamConfigEntry.getKey())) { + for (StreamIdentifier streamIdentifier : streamConfigMap) { + if (!currentStreamConfigMap.containsKey(streamIdentifier)) { + log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier); continue; } - final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(), - streamToLeasesMap.get(streamConfigEntry.getKey())); + final ShardSyncResponse shardSyncResponse = checkForShardSync(streamIdentifier, + streamToLeasesMap.get(streamIdentifier)); numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0; numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0; if (shardSyncResponse.shouldDoShardSync()) { log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ", - streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision()); + streamIdentifier, shardSyncResponse.reasonForDecision()); + final StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); + if (streamConfig == null) { + log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier); + continue; + } final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider - .apply(streamConfigEntry.getValue()); + .apply(streamConfig); if (!shardSyncTaskManager.submitShardSyncTask()) { log.warn( "Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.", @@ -213,7 +217,7 @@ class PeriodicShardSyncManager { shardSyncResponse.reasonForDecision()); } } else { - log.info("Skipping shard sync for {} due to the reason - {}", streamConfigEntry.getKey(), + log.info("Skipping shard sync for {} due to the reason - {}", streamIdentifier, shardSyncResponse.reasonForDecision()); } }