From 8e618a02c188567dfa4edcada04e5793dc528d7d Mon Sep 17 00:00:00 2001 From: Chenyuan Lee Date: Tue, 14 Mar 2023 23:32:12 -0700 Subject: [PATCH] Fix the flag bug --- .../amazon/kinesis/coordinator/Scheduler.java | 2 +- .../kinesis/coordinator/SchedulerTest.java | 24 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 494bbfaa..49d271b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -460,7 +460,7 @@ public class Scheduler implements Runnable { final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (isMultiStreamMode && (shouldSyncLeases || leasesSyncedOnAppInit)) { + if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) { // Skip updating the stream map due to no new stream since last sync if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 4f181942..0336ac6a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -966,7 +966,7 @@ public class SchedulerTest { } @Test - public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception { + public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception { final List streamConfigList = createDummyStreamConfigList(1, 6); prepareMultiStreamScheduler(streamConfigList); // Populate currentStreamConfigMap to simulate that the leader has the latest streams. @@ -975,13 +975,30 @@ public class SchedulerTest { verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); } + @Test + public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception { + prepareMultiStreamScheduler(); + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + + final List streamConfigList = createDummyStreamConfigList(1, 6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + scheduler.checkAndSyncStreamShardsAndLeases(); + + // Since the sync path has been executed once before the DDB sync flags should be flipped + // to prevent doing DDB lookups in the subsequent runs. + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + assertEquals(0, streamConfigList.stream() + .filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count()); + } + @SafeVarargs private final void prepareMultiStreamScheduler(List... streamConfigs) { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); - if (streamConfigs.length > 0 ) { + if (streamConfigs.length > 0) { stubMultiStreamTracker(streamConfigs); } when(scheduler.shouldSyncStreamsNow()).thenReturn(true); @@ -990,7 +1007,8 @@ public class SchedulerTest { @SafeVarargs private final void prepareForStaleDeletedStreamCleanupTests(List... streamConfigs) { when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { - @Override public Duration waitPeriodToDeleteFormerStreams() { + @Override + public Duration waitPeriodToDeleteFormerStreams() { return Duration.ofDays(1); } });