Fix the flag bug
This commit is contained in:
parent
103520261f
commit
8e618a02c1
2 changed files with 22 additions and 4 deletions
|
|
@ -460,7 +460,7 @@ public class Scheduler implements Runnable {
|
||||||
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
|
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
|
||||||
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
|
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
|
||||||
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
// This is done to ensure that we clean up the stale streams lingering in the lease table.
|
||||||
if (isMultiStreamMode && (shouldSyncLeases || leasesSyncedOnAppInit)) {
|
if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) {
|
||||||
// Skip updating the stream map due to no new stream since last sync
|
// Skip updating the stream map due to no new stream since last sync
|
||||||
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) {
|
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) {
|
||||||
syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases());
|
syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases());
|
||||||
|
|
|
||||||
|
|
@ -966,7 +966,7 @@ public class SchedulerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception {
|
public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception {
|
||||||
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
|
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
|
||||||
prepareMultiStreamScheduler(streamConfigList);
|
prepareMultiStreamScheduler(streamConfigList);
|
||||||
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
|
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
|
||||||
|
|
@ -975,13 +975,30 @@ public class SchedulerTest {
|
||||||
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
|
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception {
|
||||||
|
prepareMultiStreamScheduler();
|
||||||
|
scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
|
||||||
|
|
||||||
|
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
|
||||||
|
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
|
||||||
|
scheduler.checkAndSyncStreamShardsAndLeases();
|
||||||
|
|
||||||
|
// Since the sync path has been executed once before the DDB sync flags should be flipped
|
||||||
|
// to prevent doing DDB lookups in the subsequent runs.
|
||||||
|
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
|
||||||
|
assertEquals(0, streamConfigList.stream()
|
||||||
|
.filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count());
|
||||||
|
}
|
||||||
|
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) {
|
private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) {
|
||||||
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
|
||||||
.retrievalFactory(retrievalFactory);
|
.retrievalFactory(retrievalFactory);
|
||||||
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
|
||||||
metricsConfig, processorConfig, retrievalConfig));
|
metricsConfig, processorConfig, retrievalConfig));
|
||||||
if (streamConfigs.length > 0 ) {
|
if (streamConfigs.length > 0) {
|
||||||
stubMultiStreamTracker(streamConfigs);
|
stubMultiStreamTracker(streamConfigs);
|
||||||
}
|
}
|
||||||
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
|
||||||
|
|
@ -990,7 +1007,8 @@ public class SchedulerTest {
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
private final void prepareForStaleDeletedStreamCleanupTests(List<StreamConfig>... streamConfigs) {
|
private final void prepareForStaleDeletedStreamCleanupTests(List<StreamConfig>... streamConfigs) {
|
||||||
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
|
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
|
||||||
@Override public Duration waitPeriodToDeleteFormerStreams() {
|
@Override
|
||||||
|
public Duration waitPeriodToDeleteFormerStreams() {
|
||||||
return Duration.ofDays(1);
|
return Duration.ofDays(1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue