diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index cbd692bf..68bd2f2f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -239,7 +239,6 @@ public final class LeaseAssignmentManager { if (shouldRunVarianceBalancing()) { final long balanceWorkerVarianceStartTime = System.currentTimeMillis(); - this.varianceBasedBalancingLastRunTime = balanceWorkerVarianceStartTime; final int totalNewAssignmentBeforeWorkerVarianceBalancing = inMemoryStorageView.leaseToNewAssignedWorkerMap.size(); leaseAssignmentDecider.balanceWorkerVariance(); @@ -280,12 +279,13 @@ public final class LeaseAssignmentManager { } private boolean shouldRunVarianceBalancing() { + final long nowNanos = nanoTimeProvider.get(); + final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency(); - final long now = System.currentTimeMillis(); - final long varianceBalancingInterval = leaseDurationMillis * config.varianceBalancingFrequency(); + final long elapsedMillis = Math.abs(nowNanos - varianceBasedBalancingLastRunTime) / 1_000_000; - if (now - this.varianceBasedBalancingLastRunTime >= varianceBalancingInterval) { - this.varianceBasedBalancingLastRunTime = now; + if (elapsedMillis >= intervalMillis) { + varianceBasedBalancingLastRunTime = nowNanos; return true; } return false; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index cc417dc9..4835ca40 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -490,51 +490,48 @@ class LeaseAssignmentManagerTest { @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() throws Exception { + + long multiplier = 1000_000; final int varianceBalancingFrequency = 3; - final long leaseDuration = Duration.ofMillis(1000).toMillis(); + final long leaseDurationMillis = Duration.ofMillis(1000).toMillis(); + + final IntervalTimeSupplier lamTimeSupplier = new IntervalTimeSupplier(3000 * multiplier); final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); config.varianceBalancingFrequency(varianceBalancingFrequency); - createLeaseAssignmentManager(config, leaseDuration, System::nanoTime, Integer.MAX_VALUE); + createLeaseAssignmentManager(config, leaseDurationMillis, lamTimeSupplier, Integer.MAX_VALUE); - long balancingInterval = leaseDuration * varianceBalancingFrequency; - int varianceBalancingOccurred = 0; - - // Initial run at time 0 + // Run at time 0, variance balance must be done setupConditionForVarianceBalancing(); - long startTime = System.currentTimeMillis(); leaseAssignmentManagerRunnable.run(); + assertEquals( + 3L, + leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count()); - // Check initial balancing at time 0 - long leasesOwnedByWorker = leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count(); - if (leasesOwnedByWorker == 3L) { - varianceBalancingOccurred++; - } + lamTimeSupplier.incrementCurrentTime(1000 * multiplier); - // Run until we see the next balancing since LAM run is not tied to variance-based load balancing - long nextBalancingTime = startTime + balancingInterval; + // Run at time 1, variance balance must not be done + setupConditionForVarianceBalancing(); + leaseAssignmentManagerRunnable.run(); + assertEquals( + 1L, + leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count()); - while (System.currentTimeMillis() < (startTime + balancingInterval + 1000)) { - setupConditionForVarianceBalancing(); - leaseAssignmentManagerRunnable.run(); + lamTimeSupplier.incrementCurrentTime(2 * 1000 * multiplier); - leasesOwnedByWorker = leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count(); - - if (leasesOwnedByWorker == 3L && System.currentTimeMillis() >= nextBalancingTime) { - varianceBalancingOccurred++; - } - - Thread.sleep(100); - } - - assertTrue( - varianceBalancingOccurred == 2, - "Expected varianceBalancingOccurred to be greater than 1, but was: " + varianceBalancingOccurred); + // Run at time 3, variance balance must be done + setupConditionForVarianceBalancing(); + leaseAssignmentManagerRunnable.run(); + assertEquals( + 3L, + leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count()); } private void setupConditionForVarianceBalancing() throws Exception { @@ -1385,4 +1382,21 @@ class LeaseAssignmentManagerTest { .build()) .join(); } + + class IntervalTimeSupplier implements Supplier { + private long currentTime; + + public IntervalTimeSupplier(long currentTime) { + this.currentTime = currentTime; + } + + @Override + public Long get() { + return currentTime; + } + + public void incrementCurrentTime(long currentTime) { + this.currentTime += currentTime; + } + } }