Allow leader to learn new leases upon re-election to avoid unnecessary shardSyncs (#1063)

This commit is contained in:
chenylee-aws 2023-03-15 12:34:32 -07:00 committed by GitHub
parent 04a121a811
commit 0cbd74f6e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 31 deletions

View file

@ -180,7 +180,10 @@ public class Scheduler implements Runnable {
private final Object lock = new Object(); private final Object lock = new Object();
private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();
private boolean leasesSyncedOnAppInit = false; private boolean leasesSyncedOnAppInit = false;
@Getter(AccessLevel.NONE)
private boolean shouldSyncLeases = true;
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
@ -279,8 +282,6 @@ public class Scheduler implements Runnable {
PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis(); this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts(); this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector(); this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
@ -419,6 +420,8 @@ public class Scheduler implements Runnable {
// check for new streams and sync with the scheduler state // check for new streams and sync with the scheduler state
if (isLeader()) { if (isLeader()) {
checkAndSyncStreamShardsAndLeases(); checkAndSyncStreamShardsAndLeases();
} else {
shouldSyncLeases = true;
} }
logExecutorState(); logExecutorState();
@ -426,7 +429,7 @@ public class Scheduler implements Runnable {
Thread.sleep(shardConsumerDispatchPollIntervalMillis); Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (Exception e) { } catch (Exception e) {
log.error("Worker.run caught exception, sleeping for {} milli seconds!", log.error("Worker.run caught exception, sleeping for {} milli seconds!",
String.valueOf(shardConsumerDispatchPollIntervalMillis), e); shardConsumerDispatchPollIntervalMillis, e);
try { try {
Thread.sleep(shardConsumerDispatchPollIntervalMillis); Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -454,15 +457,18 @@ public class Scheduler implements Runnable {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try { try {
// This is done to ensure that we clean up the stale streams lingering in the lease table.
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
final List<MultiStreamLease> leases = fetchMultiStreamLeases();
syncStreamsFromLeaseTableOnAppInit(leases);
leasesSyncedOnAppInit = true;
}
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList() final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = streamTracker.streamConfigList()
.stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity())); .stream().collect(Collectors.toMap(StreamConfig::streamIdentifier, Function.identity()));
// This is done to ensure that we clean up the stale streams lingering in the lease table.
if (isMultiStreamMode && (shouldSyncLeases || !leasesSyncedOnAppInit)) {
// Skip updating the stream map due to no new stream since last sync
if (newStreamConfigMap.keySet().stream().anyMatch(s -> !currentStreamConfigMap.containsKey(s))) {
syncStreamsFromLeaseTableOnAppInit(fetchMultiStreamLeases());
}
leasesSyncedOnAppInit = true;
shouldSyncLeases = false;
}
// For new streams discovered, do a shard sync and update the currentStreamConfigMap // For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
@ -581,12 +587,14 @@ public class Scheduler implements Runnable {
return streamsSynced; return streamsSynced;
} }
@VisibleForTesting boolean shouldSyncStreamsNow() { @VisibleForTesting
boolean shouldSyncStreamsNow() {
return isMultiStreamMode && return isMultiStreamMode &&
(streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS); (streamSyncWatch.elapsed(TimeUnit.MILLISECONDS) > NEW_STREAM_CHECK_INTERVAL_MILLIS);
} }
@VisibleForTesting void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) { @VisibleForTesting
void syncStreamsFromLeaseTableOnAppInit(List<MultiStreamLease> leases) {
leases.stream() leases.stream()
.map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier())) .map(lease -> StreamIdentifier.multiStreamInstance(lease.streamIdentifier()))
.filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier)) .filter(streamIdentifier -> !currentStreamConfigMap.containsKey(streamIdentifier))

View file

@ -39,7 +39,6 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -66,6 +65,7 @@ import org.mockito.Mock;
import org.mockito.Spy; import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.OngoingStubbing;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@ -780,9 +780,8 @@ public class SchedulerTest {
public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4); List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
prepareForStaleDeletedStreamCleanupTests(); prepareForStaleDeletedStreamCleanupTests(streamConfigList1, streamConfigList2);
// when KCL starts it starts with tracking 5 stream // when KCL starts it starts with tracking 5 stream
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
@ -817,27 +816,12 @@ public class SchedulerTest {
} }
private void prepareForStaleDeletedStreamCleanupTests() {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
}
// Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker
@Test @Test
public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
throws ProvisionedThroughputException, InvalidStateException, DependencyException { throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6); List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); prepareForStaleDeletedStreamCleanupTests(streamConfigList1);
prepareForStaleDeletedStreamCleanupTests();
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
@ -974,6 +958,72 @@ public class SchedulerTest {
verify(rejectedTaskEvent, times(1)).accept(any()); verify(rejectedTaskEvent, times(1)).accept(any());
} }
@Test
public void testUpdateStreamMapIfMissingLatestStream() throws Exception {
prepareMultiStreamScheduler(createDummyStreamConfigList(1, 6));
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler).syncStreamsFromLeaseTableOnAppInit(any());
}
@Test
public void testNoDdbLookUpAsStreamMapContainsAllStreams() throws Exception {
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
prepareMultiStreamScheduler(streamConfigList);
// Populate currentStreamConfigMap to simulate that the leader has the latest streams.
streamConfigList.forEach(s -> scheduler.currentStreamConfigMap().put(s.streamIdentifier(), s));
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
}
@Test
public void testNoDdbLookUpForNewStreamAsLeaderFlippedTheShardSyncFlags() throws Exception {
prepareMultiStreamScheduler();
scheduler.checkAndSyncStreamShardsAndLeases();
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
final List<StreamConfig> streamConfigList = createDummyStreamConfigList(1, 6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList);
scheduler.checkAndSyncStreamShardsAndLeases();
// Since the sync path has been executed once before the DDB sync flags should be flipped
// to prevent doing DDB lookups in the subsequent runs.
verify(scheduler, never()).syncStreamsFromLeaseTableOnAppInit(any());
assertEquals(0, streamConfigList.stream()
.filter(s -> !scheduler.currentStreamConfigMap().containsKey(s.streamIdentifier())).count());
}
@SafeVarargs
private final void prepareMultiStreamScheduler(List<StreamConfig>... streamConfigs) {
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
if (streamConfigs.length > 0) {
stubMultiStreamTracker(streamConfigs);
}
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
}
@SafeVarargs
private final void prepareForStaleDeletedStreamCleanupTests(List<StreamConfig>... streamConfigs) {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
stubMultiStreamTracker(streamConfigs);
prepareMultiStreamScheduler();
}
@SafeVarargs
private final void stubMultiStreamTracker(List<StreamConfig>... streamConfigs) {
OngoingStubbing<List<StreamConfig>> stub = when(multiStreamTracker.streamConfigList());
for (List<StreamConfig> streamConfig : streamConfigs) {
stub = stub.thenReturn(streamConfig);
}
}
/*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
final int numberOfRecordsPerShard = 10; final int numberOfRecordsPerShard = 10;
final String kinesisShardPrefix = "kinesis-0-"; final String kinesisShardPrefix = "kinesis-0-";