From 69cf5996c544566a99f07ccee728f2a85a880d60 Mon Sep 17 00:00:00 2001 From: chenylee-aws <122478603+chenylee-aws@users.noreply.github.com> Date: Thu, 2 May 2024 14:43:03 -0700 Subject: [PATCH] Honor lease sync on app bootstrap (#1325) --- .../amazon/kinesis/coordinator/Scheduler.java | 44 +++++++++---------- .../kinesis/coordinator/SchedulerTest.java | 6 +-- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index f9ddc0ba..e4e63078 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -476,29 +476,29 @@ public class Scheduler implements Runnable { final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (!leaderSynced.get() || !leasesSyncedOnAppInit) { - // Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain - // different set of streams. - if (!newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet())) { - log.info("Syncing leases for leader to catch up"); - final List leaseTableLeases = fetchMultiStreamLeases(); - syncStreamsFromLeaseTableOnAppInit(leaseTableLeases); - final Set streamsFromLeaseTable = leaseTableLeases.stream() - .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) - .collect(Collectors.toSet()); - // Remove stream from currentStreamConfigMap if this stream in not in the lease table and newStreamConfigMap. - // This means that the leases have already been deleted by the last leader. - currentStreamConfigMap.keySet().stream() - .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier) - && !streamsFromLeaseTable.contains(streamIdentifier)).forEach(stream -> { - log.info("Removing stream {} from currentStreamConfigMap due to not being active", stream); - currentStreamConfigMap.remove(stream); - staleStreamDeletionMap.remove(stream); - streamsSynced.add(stream); - }); - } - leasesSyncedOnAppInit = true; + // Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain + // different set of streams and Leader has not synced the leases yet + // or this is the first app bootstrap. + if ((!leaderSynced.get() && !newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet())) + || !leasesSyncedOnAppInit) { + log.info("Syncing leases for leader to catch up"); + final List leaseTableLeases = fetchMultiStreamLeases(); + syncStreamsFromLeaseTableOnAppInit(leaseTableLeases); + final Set streamsFromLeaseTable = leaseTableLeases.stream() + .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) + .collect(Collectors.toSet()); + // Remove stream from currentStreamConfigMap if this stream in not in the lease table and newStreamConfigMap. + // This means that the leases have already been deleted by the last leader. + currentStreamConfigMap.keySet().stream() + .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier) + && !streamsFromLeaseTable.contains(streamIdentifier)).forEach(stream -> { + log.info("Removing stream {} from currentStreamConfigMap due to not being active", stream); + currentStreamConfigMap.remove(stream); + staleStreamDeletionMap.remove(stream); + streamsSynced.add(stream); + }); } + leasesSyncedOnAppInit = true; // For new streams discovered, do a shard sync and update the currentStreamConfigMap for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 1820f2dd..f29c2341 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -995,14 +995,14 @@ public class SchedulerTest { } @Test - public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception { + public void testSyncLeaseAsThisIsInitialAppBootstrapEvenThoughStreamMapContainsAllStreams() { final List streamConfigList = createDummyStreamConfigList(1, 6); when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList()); prepareMultiStreamScheduler(streamConfigList); // Populate currentStreamConfigMap to simulate that the leader has the latest streams. multiStreamTracker.streamConfigList().forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); - scheduler.checkAndSyncStreamShardsAndLeases(); - verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + scheduler.runProcessLoop(); + verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any()); assertTrue(scheduler.currentStreamConfigMap().size() != 0); }