Change logic to balance tied to LAM run

This commit is contained in:
eha sah 2025-04-10 13:58:57 -07:00
parent 6d8396d3f4
commit 6520d3e739
4 changed files with 11 additions and 77 deletions

View file

@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest {
.workerUtilizationAwareAssignmentConfig()
.staleWorkerMetricsEntryCleanupDuration());
assertEquals(
6,
3,
kclV3Config
.leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()

View file

@ -279,16 +279,14 @@ public final class LeaseAssignmentManager {
}
private boolean shouldRunVarianceBalancing() {
final long currentNanosTime = nanoTimeProvider.get();
final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency();
final long elapsedMillis = Math.abs(currentNanosTime - varianceBasedBalancingLastRunTime) / 1_000_000;
if (elapsedMillis >= intervalMillis) {
varianceBasedBalancingLastRunTime = currentNanosTime;
return true;
}
return false;
final boolean response = this.lamRunCounter == 0;
/*
To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency).
If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when
lamRunCounter is 0.
*/
this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency();
return response;
}
/**

View file

@ -585,7 +585,7 @@ public class LeaseManagementConfig {
* and so on.
* NOTE: LAM frequency = failoverTimeMillis
*/
private int varianceBalancingFrequency = 6;
private int varianceBalancingFrequency = 3;
/**
* Alpha value used for calculating exponential moving average of worker's metricStats. Selecting

View file

@ -434,7 +434,7 @@ class LeaseAssignmentManagerTest {
}
// no needed since variance based load balancing is no longer tied to LAM run
// @Test
@Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception {
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
@ -478,53 +478,6 @@ class LeaseAssignmentManagerTest {
.count());
}
@Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration()
throws Exception {
long multiplier = 1000_000;
final int varianceBalancingFrequency = 3;
final long leaseDurationMillis = Duration.ofMillis(1000).toMillis();
final IntervalTimeSupplier lamTimeSupplier = new IntervalTimeSupplier(3000 * multiplier);
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
config.varianceBalancingFrequency(varianceBalancingFrequency);
createLeaseAssignmentManager(config, leaseDurationMillis, lamTimeSupplier, Integer.MAX_VALUE);
// Run at time 0, variance balance must be done
setupConditionForVarianceBalancing();
leaseAssignmentManagerRunnable.run();
assertEquals(
3L,
leaseRefresher.listLeases().stream()
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
.count());
lamTimeSupplier.incrementCurrentTime(1000 * multiplier);
// Run at time 1, variance balance must not be done
setupConditionForVarianceBalancing();
leaseAssignmentManagerRunnable.run();
assertEquals(
1L,
leaseRefresher.listLeases().stream()
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
.count());
lamTimeSupplier.incrementCurrentTime(2 * 1000 * multiplier);
// Run at time 3, variance balance must be done
setupConditionForVarianceBalancing();
leaseAssignmentManagerRunnable.run();
assertEquals(
3L,
leaseRefresher.listLeases().stream()
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
.count());
}
private void setupConditionForVarianceBalancing() throws Exception {
workerMetricsDAO.updateMetrics(createDummyYieldWorkerMetrics(TEST_YIELD_WORKER_ID + "1"));
@ -1373,21 +1326,4 @@ class LeaseAssignmentManagerTest {
.build())
.join();
}
class IntervalTimeSupplier implements Supplier<Long> {
private long currentTime;
public IntervalTimeSupplier(long currentTime) {
this.currentTime = currentTime;
}
@Override
public Long get() {
return currentTime;
}
public void incrementCurrentTime(long currentTime) {
this.currentTime += currentTime;
}
}
}