From 24774bc2e3215fbada0b1431bdec07fedbd94650 Mon Sep 17 00:00:00 2001 From: chenylee-aws <122478603+chenylee-aws@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:53:16 -0700 Subject: [PATCH] Clean up streams from currentStreamConfigMap (#1273) --- .../coordinator/PeriodicShardSyncManager.java | 18 ++++- .../amazon/kinesis/coordinator/Scheduler.java | 44 +++++++--- .../PeriodicShardSyncManagerTest.java | 4 +- .../kinesis/coordinator/SchedulerTest.java | 80 +++++++++++++++---- 4 files changed, 115 insertions(+), 31 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 51c0b5e1..30282de4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.coordinator; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; @@ -26,6 +27,7 @@ import org.apache.commons.lang3.Validate; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; @@ -58,6 +60,7 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -70,6 +73,7 @@ import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRan @Getter @EqualsAndHashCode @Slf4j +@KinesisClientInternalApi class PeriodicShardSyncManager { private static final long INITIAL_DELAY = 60 * 1000L; @VisibleForTesting @@ -90,6 +94,8 @@ class PeriodicShardSyncManager { private final MetricsFactory metricsFactory; private final long leasesRecoveryAuditorExecutionFrequencyMillis; private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; + @Getter(AccessLevel.NONE) + private final AtomicBoolean leaderSynced; private boolean isRunning; PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, @@ -98,11 +104,13 @@ class PeriodicShardSyncManager { Map streamToShardSyncTaskManagerMap, boolean isMultiStreamingMode, MetricsFactory metricsFactory, long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + int leasesRecoveryAuditorInconsistencyConfidenceThreshold, + AtomicBoolean leaderSynced){ this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory, - leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold); + leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold, + leaderSynced); } PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher, @@ -112,7 +120,8 @@ class PeriodicShardSyncManager { ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode, MetricsFactory metricsFactory, long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + int leasesRecoveryAuditorInconsistencyConfidenceThreshold, + AtomicBoolean leaderSynced) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); this.workerId = workerId; @@ -126,6 +135,7 @@ class PeriodicShardSyncManager { this.metricsFactory = metricsFactory; this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis; this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; + this.leaderSynced = leaderSynced; } public synchronized TaskResult start() { @@ -173,7 +183,7 @@ class PeriodicShardSyncManager { } private void runShardSync() { - if (leaderDecider.isLeader(workerId)) { + if (leaderDecider.isLeader(workerId) && leaderSynced.get()) { log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId)); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 54c67997..18de8f9d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -184,7 +185,7 @@ public class Scheduler implements Runnable { private boolean leasesSyncedOnAppInit = false; @Getter(AccessLevel.NONE) - private boolean shouldSyncLeases = true; + private final AtomicBoolean leaderSynced = new AtomicBoolean(false); /** * Used to ensure that only one requestedShutdown is in progress at a time. @@ -294,7 +295,8 @@ public class Scheduler implements Runnable { leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, isMultiStreamMode, metricsFactory, leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(), - leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold()); + leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold(), + leaderSynced); this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) .createLeaseCleanupManager(metricsFactory); this.schemaRegistryDecoder = @@ -421,8 +423,9 @@ public class Scheduler implements Runnable { // check for new streams and sync with the scheduler state if (isLeader()) { checkAndSyncStreamShardsAndLeases(); + leaderSynced.set(true); } else { - shouldSyncLeases = true; + leaderSynced.set(false); } logExecutorState(); @@ -461,13 +464,28 @@ public class Scheduler implements Runnable { final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) { - // Skip updating the stream map due to no new stream since last sync - if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { - syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases()); + if (!leaderSynced.get() || !leasesSyncedOnAppInit) { + // Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain + // different set of streams. + if (!newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet())) { + log.info("Syncing leases for leader to catch up"); + final List leaseTableLeases = fetchMultiStreamLeases(); + syncStreamsFromLeaseTableOnAppInit(leaseTableLeases); + final Set streamsFromLeaseTable = leaseTableLeases.stream() + .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) + .collect(Collectors.toSet()); + // Remove stream from currentStreamConfigMap if this stream in not in the lease table and newStreamConfigMap. + // This means that the leases have already been deleted by the last leader. + currentStreamConfigMap.keySet().stream() + .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier) + && !streamsFromLeaseTable.contains(streamIdentifier)).forEach(stream -> { + log.info("Removing stream {} from currentStreamConfigMap due to not being active", stream); + currentStreamConfigMap.remove(stream); + staleStreamDeletionMap.remove(stream); + streamsSynced.add(stream); + }); } leasesSyncedOnAppInit = true; - shouldSyncLeases = false; } // For new streams discovered, do a shard sync and update the currentStreamConfigMap @@ -489,7 +507,6 @@ public class Scheduler implements Runnable { staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now()); } }; - if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) { // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion. // It is assumed that all the workers will always have the latest and consistent snapshot of streams @@ -633,10 +650,15 @@ public class Scheduler implements Runnable { final Map> streamIdToShardsMap = leases.stream().collect( Collectors.groupingBy(MultiStreamLease::streamIdentifier, Collectors.toCollection(ArrayList::new))); for (StreamIdentifier streamIdentifier : streamIdentifiers) { + log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier); + // Removing streamIdentifier from this map so PSSM doesn't think there is a hole in the stream while + // scheduler attempts to delete the stream if the stream is taking longer to delete. If deletion fails + // it will be retried again since stream will still show up in the staleStreamDeletionMap. + // It is fine for PSSM to detect holes and it should not do shardsync because it takes few iterations + // to breach the hole confidence interval threshold. + currentStreamConfigMap.remove(streamIdentifier); // Deleting leases will cause the workers to shutdown the record processors for these shards. if (deleteMultiStreamLeases(streamIdToShardsMap.get(streamIdentifier.serialize()))) { - log.warn("Found old/deleted stream: {}. Directly deleting leases of this stream.", streamIdentifier); - currentStreamConfigMap.remove(streamIdentifier); staleStreamDeletionMap.remove(streamIdentifier); streamsSynced.add(streamIdentifier); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java index bda7f53b..355e2c96 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManagerTest.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -77,7 +78,8 @@ public class PeriodicShardSyncManagerTest { public void setup() { streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456"); periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap, - shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3); + shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3, + new AtomicBoolean(true)); } @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index fd1380f2..9671bb78 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -568,7 +568,7 @@ public class SchedulerTest { testMultiStreamStaleStreamsAreNotDeletedImmediately(true, false); } - private final void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, + private void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, boolean onlyStreamsDeletionNotLeases) throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( @@ -584,7 +584,7 @@ public class SchedulerTest { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); - + mockListLeases(streamConfigList1); scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); @@ -667,6 +667,8 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + mockListLeases(streamConfigList1); + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( @@ -741,6 +743,9 @@ public class SchedulerTest { scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + // Mock listLeases to exercise the delete path so scheduler doesn't remove stale streams due to not presenting + // in lease table + mockListLeases(streamConfigList1); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams; Set expectedPendingStreams = IntStream.range(1, 3) @@ -792,6 +797,7 @@ public class SchedulerTest { // when KCL starts it starts with tracking 5 stream assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); assertEquals(0, scheduler.staleStreamDeletionMap().size()); + mockListLeases(streamConfigList1); // 2 Streams are no longer needed to be consumed Set syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases(); @@ -974,39 +980,75 @@ public class SchedulerTest { @Test public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception { final List streamConfigList = createDummyStreamConfigList(1, 6); + when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList()); prepareMultiStreamScheduler(streamConfigList); // Populate currentStreamConfigMap to simulate that the leader has the latest streams. - streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + multiStreamTracker.streamConfigList().forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); scheduler.checkAndSyncStreamShardsAndLeases(); verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + assertTrue(scheduler.currentStreamConfigMap().size() != 0); } @Test - public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception { - prepareMultiStreamScheduler(); - scheduler.checkAndSyncStreamShardsAndLeases(); - verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + public void testNotRefreshForNewStreamAfterLeaderFlippedTheShouldInitialize(){ + prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6)); + // flip the shouldInitialize flag + scheduler.runProcessLoop(); + verify(scheduler, times(1)).syncStreamsFromLeaseTableOnAppInit(any()); final List streamConfigList = createDummyStreamConfigList(1, 6); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); - scheduler.checkAndSyncStreamShardsAndLeases(); + scheduler.runProcessLoop(); // Since the sync path has been executed once before the DDB sync flags should be flipped // to prevent doing DDB lookups in the subsequent runs. - verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + verify(scheduler, times(1)).syncStreamsFromLeaseTableOnAppInit(any()); assertEquals(0, streamConfigList.stream() .filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count()); } + @Test + public void testDropStreamsFromMapsWhenStreamIsNotInLeaseTableAndNewStreamConfigMap() throws Exception { + when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList()); + prepareMultiStreamScheduler(); + final List streamConfigList = createDummyStreamConfigList(1, 6); + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(Collections.emptySet(), scheduler.currentStreamConfigMap().keySet()); + } + + @Test + public void testNotDropStreamsFromMapsWhenStreamIsInLeaseTable() throws Exception { + when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList()); + prepareForStaleDeletedStreamCleanupTests(); + final List streamConfigList = createDummyStreamConfigList(1, 6); + mockListLeases(streamConfigList); + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + final Set initialSet = new HashSet<>(scheduler.currentStreamConfigMap().keySet()); + scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(initialSet, scheduler.currentStreamConfigMap().keySet()); + assertEquals(streamConfigList.size(), scheduler.currentStreamConfigMap().keySet().size()); + } + + @Test + public void testNotDropStreamsFromMapsWhenStreamIsInNewStreamConfigMap() throws Exception { + final List streamConfigList = createDummyStreamConfigList(1, 6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + prepareMultiStreamScheduler(); + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + final Set initialSet = new HashSet<>(scheduler.currentStreamConfigMap().keySet()); + scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(initialSet, scheduler.currentStreamConfigMap().keySet()); + assertEquals(streamConfigList.size(), scheduler.currentStreamConfigMap().keySet().size()); + } + @SafeVarargs private final void prepareMultiStreamScheduler(List... streamConfigs) { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); - if (streamConfigs.length > 0) { - stubMultiStreamTracker(streamConfigs); - } + stubMultiStreamTracker(streamConfigs); when(scheduler.shouldSyncStreamsNow()).thenReturn(true); } @@ -1024,12 +1066,20 @@ public class SchedulerTest { @SafeVarargs private final void stubMultiStreamTracker(List... streamConfigs) { - OngoingStubbing> stub = when(multiStreamTracker.streamConfigList()); - for (List streamConfig : streamConfigs) { - stub = stub.thenReturn(streamConfig); + if (streamConfigs.length > 0) { + OngoingStubbing> stub = when(multiStreamTracker.streamConfigList()); + for (List streamConfig : streamConfigs) { + stub = stub.thenReturn(streamConfig); + } } } + private void mockListLeases(List configs) throws ProvisionedThroughputException, InvalidStateException, DependencyException { + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(configs.stream() + .map(s -> new MultiStreamLease().streamIdentifier(s.streamIdentifier().toString()) + .shardId("some_random_shard_id")).collect(Collectors.toList())); + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; final String kinesisShardPrefix = "kinesis-0-";