Introduce shouldSyncLeases variable separately

This commit is contained in:
Chenyuan Lee 2023-03-14 12:11:50 -07:00
parent 9816d817b1
commit 103520261f
2 changed files with 44 additions and 37 deletions

View file

@ -180,6 +180,9 @@ public class Scheduler implements Runnable {
private final Object lock = new Object(); private final Object lock = new Object();
private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false;
@Getter(AccessLevel.NONE)
private boolean shouldSyncLeases = true; private boolean shouldSyncLeases = true;
/** /**
@ -457,11 +460,12 @@ public class Scheduler implements Runnable {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList() final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
// This is done to ensure that we clean up the stale streams lingering in the lease table. // This is done to ensure that we clean up the stale streams lingering in the lease table.
if (shouldSyncLeases && isMultiStreamMode) { if (isMultiStreamMode && (shouldSyncLeases || leasesSyncedOnAppInit)) {
// Skip updating the stream map due to no new stream since last sync // Skip updating the stream map due to no new stream since last sync
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) { if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) {
syncStreamsFromLeaseTable(fetchMultiStreamLeases()); syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases());
} }
leasesSyncedOnAppInit = true;
shouldSyncLeases = false; shouldSyncLeases = false;
} }
@ -590,7 +594,7 @@ public class Scheduler implements Runnable {
} }
@VisibleForTesting @VisibleForTesting
void syncStreamsFromLeaseTable(List<MultiStreamLease> leases) { void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
leases.stream() leases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier))

View file

@ -65,6 +65,7 @@ import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.OngoingStubbing;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -470,7 +471,7 @@ public class SchedulerTest {
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTable(leasesInTable); scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
StreamConfig::streamIdentifier, Function.identity())); StreamConfig::streamIdentifier, Function.identity()));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
@ -506,7 +507,7 @@ public class SchedulerTest {
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig); metricsConfig, processorConfig, retrievalConfig);
scheduler.syncStreamsFromLeaseTable(leasesInTable); scheduler.syncStreamsFromLeaseTableOnAppInit(leasesInTable);
Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap( Map<StreamIdentifier, StreamConfig> expectedConfigMap = expectedConfig.stream().collect(Collectors.toMap(
sc -> sc.streamIdentifier(), sc -> sc)); sc -> sc.streamIdentifier(), sc -> sc));
Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap()); Assert.assertEquals(expectedConfigMap, scheduler.currentStreamConfigMap());
@ -779,9 +780,8 @@ public class SchedulerTest {
public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4); List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
prepareForStaleDeletedStreamCleanupTests(); prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2);
// when KCL starts it starts with tracking 5 stream // when KCL starts it starts with tracking 5 stream
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
@ -816,27 +816,12 @@ public class SchedulerTest {
} }
private void prepareForStaleDeletedStreamCleanupTests() {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
}
// Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker
@Test @Test
public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
throws ProvisionedThroughputException, InvalidStateException, DependencyException { throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); prepareForStaleDeletedStreamCleanupTests(streamConfigList1);
prepareForStaleDeletedStreamCleanupTests();
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
@ -975,32 +960,50 @@ public class SchedulerTest {
@Test @Test
public void testUpdateStreamMapIfMissingLatestStream() throws Exception { public void testUpdateStreamMapIfMissingLatestStream() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6); prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
scheduler.checkAndSyncStreamShardsAndLeases(); scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler).syncStreamsFromLeaseTable(any()); verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any());
} }
@Test @Test
public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception { public void testNotUpdatingStreamMapAsItContainsAllStreams() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6); final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
prepareMultiStreamScheduler(streamConfigList);
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
}
@SafeVarargs
private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory); .retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig)); metricsConfig, processorConfig, retrievalConfig));
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList); if (streamConfigs.length > 0 ) {
stubMultiStreamTracker(streamConfigs);
}
when(scheduler.shouldSyncStreamsNow()).thenReturn(true); when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
// Populate currentStreamConfigMap to simulate that the leader has the latest streams. }
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases(); @SafeVarargs
verify(scheduler, times(0)).syncStreamsFromLeaseTable(any()); private final void prepareForStaleDeletedStreamCleanupTests(List<StreamConfig>... streamConfigs) {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
stubMultiStreamTracker(streamConfigs);
prepareMultiStreamScheduler();
}
@SafeVarargs
private final void stubMultiStreamTracker(List<StreamConfig>... streamConfigs) {
OngoingStubbing<List<StreamConfig>> stub = when(multiStreamTracker.streamConfigList());
for (List<StreamConfig> streamConfig : streamConfigs) {
stub = stub.thenReturn(streamConfig);
}
} }
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {