Reuse 'ShardSyncTaskManager' instance for existing stream to avoid duplicate enqueue of 'ShardSyncTask' (#1277)

Co-authored-by: Tongqing Zhang <tqzhang@amazon.com>
This commit is contained in:
laxeo 2024-03-26 13:13:39 -07:00 committed by GitHub
parent f205673fee
commit e9b810ba07
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 31 additions and 27 deletions

View file

@ -84,6 +84,7 @@ class PeriodicShardSyncManager {
private final LeaseRefresher leaseRefresher;
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
private final ScheduledExecutorService shardSyncThreadPool;
private final boolean isMultiStreamingMode;
private final MetricsFactory metricsFactory;
@ -93,11 +94,13 @@ class PeriodicShardSyncManager {
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
boolean isMultiStreamingMode, MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
streamToShardSyncTaskManagerMap,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory,
leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold);
}
@ -105,6 +108,7 @@ class PeriodicShardSyncManager {
PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap,
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
@ -116,6 +120,7 @@ class PeriodicShardSyncManager {
this.leaseRefresher = leaseRefresher;
this.currentStreamConfigMap = currentStreamConfigMap;
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.streamToShardSyncTaskManagerMap = streamToShardSyncTaskManagerMap;
this.shardSyncThreadPool = shardSyncThreadPool;
this.isMultiStreamingMode = isMultiStreamingMode;
this.metricsFactory = metricsFactory;
@ -175,6 +180,7 @@ class PeriodicShardSyncManager {
PERIODIC_SHARD_SYNC_MANAGER);
int numStreamsWithPartialLeases = 0;
int numStreamsToSync = 0;
int numSkippedShardSyncTask = 0;
boolean isRunSuccess = false;
final long runStartMillis = System.currentTimeMillis();
@ -205,12 +211,26 @@ class PeriodicShardSyncManager {
log.info("Skipping shard sync task for {} as stream is purged", streamIdentifier);
continue;
}
final ShardSyncTaskManager shardSyncTaskManager = shardSyncTaskManagerProvider
.apply(streamConfig);
final ShardSyncTaskManager shardSyncTaskManager;
if (streamToShardSyncTaskManagerMap.containsKey(streamConfig)) {
log.info("shardSyncTaskManager for stream {} already exists",
streamIdentifier.streamName());
shardSyncTaskManager = streamToShardSyncTaskManagerMap.get(streamConfig);
}
else {
// If streamConfig of a stream has already been added to currentStreamConfigMap but
// Scheduler failed to create shardSyncTaskManager for it, then Scheduler will not try
// to create one later. So enable PeriodicShardSyncManager to do it for such cases
log.info("Failed to get shardSyncTaskManager so creating one for stream {}.",
streamIdentifier.streamName());
shardSyncTaskManager = streamToShardSyncTaskManagerMap.computeIfAbsent(
streamConfig, s -> shardSyncTaskManagerProvider.apply(s));
}
if (!shardSyncTaskManager.submitShardSyncTask()) {
log.warn(
"Failed to submit shard sync task for stream {}. This could be due to the previous pending shard sync task.",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName());
numSkippedShardSyncTask += 1;
} else {
log.info("Submitted shard sync task for stream {} because of reason {}",
shardSyncTaskManager.shardDetector().streamIdentifier().streamName(),
@ -227,6 +247,7 @@ class PeriodicShardSyncManager {
} finally {
scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NumSkippedShardSyncTask", numSkippedShardSyncTask, StandardUnit.COUNT, MetricsLevel.SUMMARY);
MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY);
scope.end();
}

View file

@ -141,7 +141,7 @@ public class Scheduler implements Runnable {
private final DiagnosticEventHandler diagnosticEventHandler;
private final LeaseCoordinator leaseCoordinator;
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new HashMap<>();
private final Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap = new ConcurrentHashMap<>();
private final PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final ShardPrioritization shardPrioritization;
private final boolean cleanupLeasesUponShardCompletion;
@ -292,7 +292,7 @@ public class Scheduler implements Runnable {
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, isMultiStreamMode, metricsFactory,
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)

View file

@ -196,7 +196,7 @@ public class ShardSyncTaskManager {
} else {
if (log.isDebugEnabled()) {
log.debug("Previous {} task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.", currentTask.taskType());
+ "Triggered a pending request but will not be executed until the current request completes.", currentTask.taskType());
}
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
@ -208,25 +208,6 @@ public class ShardSyncTaskManager {
log.error("Caught exception running {} task: {}", currentTask.taskType(),
exception != null ? exception : taskResult.getException());
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// submitShardSyncTask is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask();
}
} finally {
lock.unlock();
}
}
}

View file

@ -70,12 +70,14 @@ public class PeriodicShardSyncManagerTest {
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap;
@Mock
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
@Mock
Map<StreamConfig, ShardSyncTaskManager> streamToShardSyncTaskManagerMap;
@Before
public void setup() {
streamIdentifier = StreamIdentifier.multiStreamInstance("123456789012:stream:456");
periodicShardSyncManager = new PeriodicShardSyncManager("worker", leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, true, new NullMetricsFactory(), 2 * 60 * 1000, 3);
shardSyncTaskManagerProvider, streamToShardSyncTaskManagerMap, true, new NullMetricsFactory(), 2 * 60 * 1000, 3);
}
@Test