diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index dacb7ba1..49d271b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -180,7 +180,10 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); + private boolean leasesSyncedOnAppInit = false; + @Getter(AccessLevel.NONE) + private boolean shouldSyncLeases = true; /** * Used to ensure that only one requestedShutdown is in progress at a time. @@ -279,8 +282,6 @@ public class Scheduler implements Runnable { PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); -// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); -// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); @@ -419,6 +420,8 @@ public class Scheduler implements Runnable { // check for new streams and sync with the scheduler state if (isLeader()) { checkAndSyncStreamShardsAndLeases(); + } else { + shouldSyncLeases = true; } logExecutorState(); @@ -426,7 +429,7 @@ public class Scheduler implements Runnable { Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (Exception e) { log.error("Worker.run caught exception, sleeping for {} milli seconds!", - String.valueOf(shardConsumerDispatchPollIntervalMillis), e); + shardConsumerDispatchPollIntervalMillis, e); try { Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (InterruptedException ex) { @@ -454,15 +457,18 @@ public class Scheduler implements Runnable { final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); try { - // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (!leasesSyncedOnAppInit && isMultiStreamMode) { - final List leases = fetchMultiStreamLeases(); - syncStreamsFromLeaseTableOnAppInit(leases); - leasesSyncedOnAppInit = true; - } - final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); + // This is done to ensure that we clean up the stale streams lingering in the lease table. + if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) { + // Skip updating the stream map due to no new stream since last sync + if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { + syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases()); + } + leasesSyncedOnAppInit = true; + shouldSyncLeases = false; + } + // For new streams discovered, do a shard sync and update the currentStreamConfigMap for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { @@ -581,12 +587,14 @@ public class Scheduler implements Runnable { return streamsSynced; } - @VisibleForTesting boolean shouldSyncStreamsNow() { + @VisibleForTesting + boolean shouldSyncStreamsNow() { return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); } - @VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List leases) { + @VisibleForTesting + void syncStreamsFromLeaseTableOnAppInit(List leases) { leases.stream() .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index af0755a3..0336ac6a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -39,7 +39,6 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -66,6 +65,7 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.OngoingStubbing; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -780,9 +780,8 @@ public class SchedulerTest { public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { List streamConfigList1 = createDummyStreamConfigList(1,6); List streamConfigList2 = createDummyStreamConfigList(1,4); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); - prepareForStaleDeletedStreamCleanupTests(); + prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2); // when KCL starts it starts with tracking 5 stream assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); @@ -817,27 +816,12 @@ public class SchedulerTest { } - private void prepareForStaleDeletedStreamCleanupTests() { - - when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { - @Override public Duration waitPeriodToDeleteFormerStreams() { - return Duration.ofDays(1); - } - }); - - retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) - .retrievalFactory(retrievalFactory); - scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, - metricsConfig, processorConfig, retrievalConfig)); - when(scheduler.shouldSyncStreamsNow()).thenReturn(true); - } // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker @Test public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() throws ProvisionedThroughputException, InvalidStateException, DependencyException { List streamConfigList1 = createDummyStreamConfigList(1,6); - when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); - prepareForStaleDeletedStreamCleanupTests(); + prepareForStaleDeletedStreamCleanupTests(streamConfigList1); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); @@ -974,6 +958,72 @@ public class SchedulerTest { verify(rejectedTaskEvent, times(1)).accept(any()); } + @Test + public void testUpdateStreamMapIfMissingLatestStream() throws Exception { + prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6)); + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any()); + } + + @Test + public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception { + final List streamConfigList = createDummyStreamConfigList(1, 6); + prepareMultiStreamScheduler(streamConfigList); + // Populate currentStreamConfigMap to simulate that the leader has the latest streams. + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + } + + @Test + public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception { + prepareMultiStreamScheduler(); + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + + final List streamConfigList = createDummyStreamConfigList(1, 6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + scheduler.checkAndSyncStreamShardsAndLeases(); + + // Since the sync path has been executed once before the DDB sync flags should be flipped + // to prevent doing DDB lookups in the subsequent runs. + verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any()); + assertEquals(0, streamConfigList.stream() + .filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count()); + } + + @SafeVarargs + private final void prepareMultiStreamScheduler(List... streamConfigs) { + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + if (streamConfigs.length > 0) { + stubMultiStreamTracker(streamConfigs); + } + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + } + + @SafeVarargs + private final void prepareForStaleDeletedStreamCleanupTests(List... streamConfigs) { + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override + public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofDays(1); + } + }); + stubMultiStreamTracker(streamConfigs); + prepareMultiStreamScheduler(); + } + + @SafeVarargs + private final void stubMultiStreamTracker(List... streamConfigs) { + OngoingStubbing> stub = when(multiStreamTracker.streamConfigList()); + for (List streamConfig : streamConfigs) { + stub = stub.thenReturn(streamConfig); + } + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; final String kinesisShardPrefix = "kinesis-0-";