diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index dacb7ba1..2da2483b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -180,7 +180,7 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); - private boolean leasesSyncedOnAppInit = false; + private boolean shouldSyncLeases = true; /** * Used to ensure that only one requestedShutdown is in progress at a time. @@ -279,8 +279,6 @@ public class Scheduler implements Runnable { PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); -// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); -// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); @@ -419,6 +417,8 @@ public class Scheduler implements Runnable { // check for new streams and sync with the scheduler state if (isLeader()) { checkAndSyncStreamShardsAndLeases(); + } else { + shouldSyncLeases = true; } logExecutorState(); @@ -426,7 +426,7 @@ public class Scheduler implements Runnable { Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (Exception e) { log.error("Worker.run caught exception, sleeping for {} milli seconds!", - String.valueOf(shardConsumerDispatchPollIntervalMillis), e); + shardConsumerDispatchPollIntervalMillis, e); try { Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (InterruptedException ex) { @@ -454,15 +454,17 @@ public class Scheduler implements Runnable { final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); try { - // This is done to ensure that we clean up the stale streams lingering in the lease table. - if (!leasesSyncedOnAppInit && isMultiStreamMode) { - final List leases = fetchMultiStreamLeases(); - syncStreamsFromLeaseTableOnAppInit(leases); - leasesSyncedOnAppInit = true; - } - final Map newStreamConfigMap = streamTracker.streamConfigList() .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); + // This is done to ensure that we clean up the stale streams lingering in the lease table. + if (shouldSyncLeases && isMultiStreamMode) { + // Skip updating the stream map due to no new stream since last sync + if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { + syncStreamsFromLeaseTable(fetchMultiStreamLeases()); + } + shouldSyncLeases = false; + } + // For new streams discovered, do a shard sync and update the currentStreamConfigMap for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { @@ -581,12 +583,14 @@ public class Scheduler implements Runnable { return streamsSynced; } - @VisibleForTesting boolean shouldSyncStreamsNow() { + @VisibleForTesting + boolean shouldSyncStreamsNow() { return isMultiStreamMode && (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); } - @VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List leases) { + @VisibleForTesting + void syncStreamsFromLeaseTable(List leases) { leases.stream() .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index af0755a3..f6735b47 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -39,7 +39,6 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -471,7 +470,7 @@ public class SchedulerTest { .retrievalFactory(retrievalFactory); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); - scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); + scheduler.syncStreamsFromLeaseTable(leasesInTable); Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( StreamConfig::streamIdentifier, Function.identity())); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); @@ -507,7 +506,7 @@ public class SchedulerTest { .retrievalFactory(retrievalFactory); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); - scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable); + scheduler.syncStreamsFromLeaseTable(leasesInTable); Map expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( sc -> sc.streamIdentifier(), sc -> sc)); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); @@ -974,6 +973,36 @@ public class SchedulerTest { verify(rejectedTaskEvent, times(1)).accept(any()); } + @Test + public void testUpdateStreamMapIfMissingLatestStream() throws Exception { + final List streamConfigList = createDummyStreamConfigList(1, 6); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler).syncStreamsFromLeaseTable(any()); + } + + @Test + public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception { + final List streamConfigList = createDummyStreamConfigList(1, 6); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + // Populate currentStreamConfigMap to simulate that the leader has the latest streams. + streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s)); + + scheduler.checkAndSyncStreamShardsAndLeases(); + verify(scheduler, times(0)).syncStreamsFromLeaseTable(any()); + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; final String kinesisShardPrefix = "kinesis-0-";