diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 2da2483b..494bbfaa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -180,6 +180,9 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); + + private boolean leasesSyncedOnAppInit = false; + @Getter(AccessLevel.NONE) private boolean shouldSyncLeases = true; /** @@ -457,11 +460,12 @@ public class Scheduler implements Runnable { final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (shouldSyncLeases && isMultiStreamMode) { + if (isMultiStreamMode && (shouldSyncLeases || leasesSyncedOnAppInit)) { // Skip updating the stream map due to no new stream since last sync if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { - syncStreamsFromLeaseTable(fetchMultiStreamLeases()); + syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases()); } + leasesSyncedOnAppInit = true; shouldSyncLeases = false; } @@ -590,7 +594,7 @@ public class Scheduler implements Runnable { } @VisibleForTesting - void syncStreamsFromLeaseTable(List leases) { + void syncStreamsFromLeaseTableOnAppInit(List leases) { leases.stream() .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index f6735b47..4f181942 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -65,6 +65,7 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.OngoingStubbing; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -470,7 +471,7 @@ public class SchedulerTest { .retrievalFactory(retrievalFactory); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); - scheduler.syncStreamsFromLeaseTable(leasesInTable); + scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( StreamConfig::streamIdentifier, Function.identity())); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); @@ -506,7 +507,7 @@ public class SchedulerTest { .retrievalFactory(retrievalFactory); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); - scheduler.syncStreamsFromLeaseTable(leasesInTable); + scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( sc -> sc.streamIdentifier(), sc -> sc)); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); @@ -779,9 +780,8 @@ public class SchedulerTest { public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { List streamConfigList1 = createDummyStreamConfigList(1,6); List streamConfigList2 = createDummyStreamConfigList(1,4); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); - prepareForStaleDeletedStreamCleanupTests(); + prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2); // when KCL starts it starts with tracking 5 stream assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); @@ -816,27 +816,12 @@ public class SchedulerTest { } - private void prepareForStaleDeletedStreamCleanupTests() { - - when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { - @Override public Duration waitPeriodToDeleteFormerStreams() { - return Duration.ofDays(1); - } - }); - - retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) - .retrievalFactory(retrievalFactory); - scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, - metricsConfig, processorConfig, retrievalConfig)); - when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - } // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker @Test public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() throws ProvisionedThroughputException, InvalidStateException, DependencyException { List streamConfigList1 = createDummyStreamConfigList(1,6); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); - prepareForStaleDeletedStreamCleanupTests(); + prepareForStaleDeletedStreamCleanupTests(streamConfigList1); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); @@ -975,32 +960,50 @@ public class SchedulerTest { @Test public void testUpdateStreamMapIfMissingLatestStream() throws Exception { - final List streamConfigList = createDummyStreamConfigList(1, 6); - retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) - .retrievalFactory(retrievalFactory); - scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, - metricsConfig, processorConfig, retrievalConfig)); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); - when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - + prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6)); scheduler.checkAndSyncStreamShardsAndLeases(); - verify(scheduler).syncStreamsFromLeaseTable(any()); + verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any()); } @Test public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception { final List streamConfigList = createDummyStreamConfigList(1, 6); + prepareMultiStreamScheduler(streamConfigList); + // Populate currentStreamConfigMap to simulate that the leader has the latest streams. + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + } + + @SafeVarargs + private final void prepareMultiStreamScheduler(List... streamConfigs) { retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + if (streamConfigs.length > 0 ) { + stubMultiStreamTracker(streamConfigs); + } when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - // Populate currentStreamConfigMap to simulate that the leader has the latest streams. - streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + } - scheduler.checkAndSyncStreamShardsAndLeases(); - verify(scheduler, times(0)).syncStreamsFromLeaseTable(any()); + @SafeVarargs + private final void prepareForStaleDeletedStreamCleanupTests(List... streamConfigs) { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofDays(1); + } + }); + stubMultiStreamTracker(streamConfigs); + prepareMultiStreamScheduler(); + } + + @SafeVarargs + private final void stubMultiStreamTracker(List... streamConfigs) { + OngoingStubbing> stub = when(multiStreamTracker.streamConfigList()); + for (List streamConfig : streamConfigs) { + stub = stub.thenReturn(streamConfig); + } } /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {