Honor lease sync on app bootstrap (#1325)
This commit is contained in:
parent
35fc72b2c8
commit
69cf5996c5
2 changed files with 25 additions and 25 deletions
|
|
@ -476,10 +476,11 @@ public class Scheduler implements Runnable {
|
||||||
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
|
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
|
||||||
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
|
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
|
||||||
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
||||||
if (!leaderSynced.get() || !leasesSyncedOnAppInit) {
|
|
||||||
// Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain
|
// Only sync from lease table again if the currentStreamConfigMap and newStreamConfigMap contain
|
||||||
// different set of streams.
|
// different set of streams and Leader has not synced the leases yet
|
||||||
if (!newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet())) {
|
// or this is the first app bootstrap.
|
||||||
|
if ((!leaderSynced.get() && !newStreamConfigMap.keySet().equals(currentStreamConfigMap.keySet()))
|
||||||
|
|| !leasesSyncedOnAppInit) {
|
||||||
log.info("Syncing leases for leader to catch up");
|
log.info("Syncing leases for leader to catch up");
|
||||||
final List<MultiStreamLease> leaseTableLeases = fetchMultiStreamLeases();
|
final List<MultiStreamLease> leaseTableLeases = fetchMultiStreamLeases();
|
||||||
syncStreamsFromLeaseTableOnAppInit(leaseTableLeases);
|
syncStreamsFromLeaseTableOnAppInit(leaseTableLeases);
|
||||||
|
|
@ -498,7 +499,6 @@ public class Scheduler implements Runnable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
leasesSyncedOnAppInit = true;
|
leasesSyncedOnAppInit = true;
|
||||||
}
|
|
||||||
|
|
||||||
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
|
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
|
||||||
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
|
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
|
||||||
|
|
|
||||||
|
|
@ -995,14 +995,14 @@ public class SchedulerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception {
|
public void testSyncLeaseAsThisIsInitialAppBootstrapEvenThoughStreamMapContainsAllStreams() {
|
||||||
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
|
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
|
||||||
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList());
|
when(multiStreamTracker.streamConfigList()).thenReturn(Collections.emptyList());
|
||||||
prepareMultiStreamScheduler(streamConfigList);
|
prepareMultiStreamScheduler(streamConfigList);
|
||||||
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
|
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
|
||||||
multiStreamTracker.streamConfigList().forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
|
multiStreamTracker.streamConfigList().forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
|
||||||
scheduler.checkAndSyncStreamShardsAndLeases();
|
scheduler.runProcessLoop();
|
||||||
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
|
verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any());
|
||||||
assertTrue(scheduler.currentStreamConfigMap().size() != 0);
|
assertTrue(scheduler.currentStreamConfigMap().size() != 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue