Updated test logic to use Supplier to provide time
This commit is contained in:
parent
57868a28e1
commit
6b19b6e996
2 changed files with 52 additions and 38 deletions
|
|
@ -239,7 +239,6 @@ public final class LeaseAssignmentManager {
|
||||||
|
|
||||||
if (shouldRunVarianceBalancing()) {
|
if (shouldRunVarianceBalancing()) {
|
||||||
final long balanceWorkerVarianceStartTime = System.currentTimeMillis();
|
final long balanceWorkerVarianceStartTime = System.currentTimeMillis();
|
||||||
this.varianceBasedBalancingLastRunTime = balanceWorkerVarianceStartTime;
|
|
||||||
final int totalNewAssignmentBeforeWorkerVarianceBalancing =
|
final int totalNewAssignmentBeforeWorkerVarianceBalancing =
|
||||||
inMemoryStorageView.leaseToNewAssignedWorkerMap.size();
|
inMemoryStorageView.leaseToNewAssignedWorkerMap.size();
|
||||||
leaseAssignmentDecider.balanceWorkerVariance();
|
leaseAssignmentDecider.balanceWorkerVariance();
|
||||||
|
|
@ -280,12 +279,13 @@ public final class LeaseAssignmentManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldRunVarianceBalancing() {
|
private boolean shouldRunVarianceBalancing() {
|
||||||
|
final long nowNanos = nanoTimeProvider.get();
|
||||||
|
final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency();
|
||||||
|
|
||||||
final long now = System.currentTimeMillis();
|
final long elapsedMillis = Math.abs(nowNanos - varianceBasedBalancingLastRunTime) / 1_000_000;
|
||||||
final long varianceBalancingInterval = leaseDurationMillis * config.varianceBalancingFrequency();
|
|
||||||
|
|
||||||
if (now - this.varianceBasedBalancingLastRunTime >= varianceBalancingInterval) {
|
if (elapsedMillis >= intervalMillis) {
|
||||||
this.varianceBasedBalancingLastRunTime = now;
|
varianceBasedBalancingLastRunTime = nowNanos;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
|
||||||
|
|
@ -490,51 +490,48 @@ class LeaseAssignmentManagerTest {
|
||||||
@Test
|
@Test
|
||||||
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration()
|
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
|
long multiplier = 1000_000;
|
||||||
final int varianceBalancingFrequency = 3;
|
final int varianceBalancingFrequency = 3;
|
||||||
final long leaseDuration = Duration.ofMillis(1000).toMillis();
|
final long leaseDurationMillis = Duration.ofMillis(1000).toMillis();
|
||||||
|
|
||||||
|
final IntervalTimeSupplier lamTimeSupplier = new IntervalTimeSupplier(3000 * multiplier);
|
||||||
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
|
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
|
||||||
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
|
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
|
||||||
config.varianceBalancingFrequency(varianceBalancingFrequency);
|
config.varianceBalancingFrequency(varianceBalancingFrequency);
|
||||||
|
|
||||||
createLeaseAssignmentManager(config, leaseDuration, System::nanoTime, Integer.MAX_VALUE);
|
createLeaseAssignmentManager(config, leaseDurationMillis, lamTimeSupplier, Integer.MAX_VALUE);
|
||||||
|
|
||||||
long balancingInterval = leaseDuration * varianceBalancingFrequency;
|
// Run at time 0, variance balance must be done
|
||||||
int varianceBalancingOccurred = 0;
|
|
||||||
|
|
||||||
// Initial run at time 0
|
|
||||||
setupConditionForVarianceBalancing();
|
setupConditionForVarianceBalancing();
|
||||||
long startTime = System.currentTimeMillis();
|
|
||||||
leaseAssignmentManagerRunnable.run();
|
leaseAssignmentManagerRunnable.run();
|
||||||
|
assertEquals(
|
||||||
|
3L,
|
||||||
|
leaseRefresher.listLeases().stream()
|
||||||
|
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
|
||||||
|
.count());
|
||||||
|
|
||||||
// Check initial balancing at time 0
|
lamTimeSupplier.incrementCurrentTime(1000 * multiplier);
|
||||||
long leasesOwnedByWorker = leaseRefresher.listLeases().stream()
|
|
||||||
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
|
|
||||||
.count();
|
|
||||||
if (leasesOwnedByWorker == 3L) {
|
|
||||||
varianceBalancingOccurred++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run until we see the next balancing since LAM run is not tied to variance-based load balancing
|
// Run at time 1, variance balance must not be done
|
||||||
long nextBalancingTime = startTime + balancingInterval;
|
setupConditionForVarianceBalancing();
|
||||||
|
leaseAssignmentManagerRunnable.run();
|
||||||
|
assertEquals(
|
||||||
|
1L,
|
||||||
|
leaseRefresher.listLeases().stream()
|
||||||
|
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
|
||||||
|
.count());
|
||||||
|
|
||||||
while (System.currentTimeMillis() < (startTime + balancingInterval + 1000)) {
|
lamTimeSupplier.incrementCurrentTime(2 * 1000 * multiplier);
|
||||||
setupConditionForVarianceBalancing();
|
|
||||||
leaseAssignmentManagerRunnable.run();
|
|
||||||
|
|
||||||
leasesOwnedByWorker = leaseRefresher.listLeases().stream()
|
// Run at time 3, variance balance must be done
|
||||||
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
|
setupConditionForVarianceBalancing();
|
||||||
.count();
|
leaseAssignmentManagerRunnable.run();
|
||||||
|
assertEquals(
|
||||||
if (leasesOwnedByWorker == 3L && System.currentTimeMillis() >= nextBalancingTime) {
|
3L,
|
||||||
varianceBalancingOccurred++;
|
leaseRefresher.listLeases().stream()
|
||||||
}
|
.filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID))
|
||||||
|
.count());
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(
|
|
||||||
varianceBalancingOccurred == 2,
|
|
||||||
"Expected varianceBalancingOccurred to be greater than 1, but was: " + varianceBalancingOccurred);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupConditionForVarianceBalancing() throws Exception {
|
private void setupConditionForVarianceBalancing() throws Exception {
|
||||||
|
|
@ -1385,4 +1382,21 @@ class LeaseAssignmentManagerTest {
|
||||||
.build())
|
.build())
|
||||||
.join();
|
.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class IntervalTimeSupplier implements Supplier<Long> {
|
||||||
|
private long currentTime;
|
||||||
|
|
||||||
|
public IntervalTimeSupplier(long currentTime) {
|
||||||
|
this.currentTime = currentTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return currentTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementCurrentTime(long currentTime) {
|
||||||
|
this.currentTime += currentTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue