Allow leader to learn about new leases upon leader change to avoid extra shard syncs

This commit is contained in:
Chenyuan Lee 2023-03-13 13:55:23 -07:00
parent 27b166c5aa
commit 9816d817b1
2 changed files with 49 additions and 16 deletions

View file

@ -180,7 +180,7 @@ public class Scheduler implements Runnable {
private final Object lock = new Object();
private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
private boolean shouldSyncLeases = true;
/**
* Used to ensure that only one requestedShutdown is in progress at a time.
@ -279,8 +279,6 @@ public class Scheduler implements Runnable {
PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
@ -419,6 +417,8 @@ public class Scheduler implements Runnable {
// check for new streams and sync with the scheduler state
if (isLeader()) {
checkAndSyncStreamShardsAndLeases();
} else {
shouldSyncLeases = true;
}
logExecutorState();
@ -426,7 +426,7 @@ public class Scheduler implements Runnable {
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (Exception e) {
log.error("Worker.run caught exception, sleeping for {} milli seconds!",
String.valueOf(shardConsumerDispatchPollIntervalMillis), e);
shardConsumerDispatchPollIntervalMillis, e);
try {
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (InterruptedException ex) {
@ -454,15 +454,17 @@ public class Scheduler implements Runnable {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try {
// This is done to ensure that we clean up the stale streams lingering in the lease table.
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
final List<MultiStreamLease> leases = fetchMultiStreamLeases();
syncStreamsFromLeaseTableOnAppInit(leases);
leasesSyncedOnAppInit = true;
}
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
// This is done to ensure that we clean up the stale streams lingering in the lease table.
if (shouldSyncLeases && isMultiStreamMode) {
// Skip updating the stream map due to no new stream since last sync
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) {
syncStreamsFromLeaseTable(fetchMultiStreamLeases());
}
shouldSyncLeases = false;
}
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
@ -581,12 +583,14 @@ public class Scheduler implements Runnable {
return streamsSynced;
}
@VisibleForTesting boolean shouldSyncStreamsNow() {
@VisibleForTesting
boolean shouldSyncStreamsNow() {
return isMultiStreamMode &&
(streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
}
@VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
@VisibleForTesting
void syncStreamsFromLeaseTable(List<MultiStreamLease> leases) {
leases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier))

View file

@ -39,7 +39,6 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -471,7 +470,7 @@ public class SchedulerTest {
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
scheduler.syncStreamsFromLeaseTable(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
StreamConfig::streamIdentifier, Function.identity()));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
@ -507,7 +506,7 @@ public class SchedulerTest {
.retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
scheduler.syncStreamsFromLeaseTable(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
sc -> sc.streamIdentifier(), sc -> sc));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
@ -974,6 +973,36 @@ public class SchedulerTest {
verify(rejectedTaskEvent, times(1)).accept(any());
}
@Test
public void testUpdateStreamMapIfMissingLatestStream() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler).syncStreamsFromLeaseTable(any());
}
@Test
public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler, times(0)).syncStreamsFromLeaseTable(any());
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10;
final String kinesisShardPrefix = "kinesis-0-";