diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 68bd2f2f..162bda31 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -279,16 +279,14 @@ public final class LeaseAssignmentManager { } private boolean shouldRunVarianceBalancing() { - final long nowNanos = nanoTimeProvider.get(); - final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency(); - - final long elapsedMillis = Math.abs(nowNanos - varianceBasedBalancingLastRunTime) / 1_000_000; - - if (elapsedMillis >= intervalMillis) { - varianceBasedBalancingLastRunTime = nowNanos; - return true; - } - return false; + final boolean response = this.lamRunCounter == 0; + /* + To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency). + If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when + lamRunCounter is 0. + */ + this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency(); + return response; } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index f8782eea..445bcc3c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -579,11 +579,11 @@ public class LeaseManagementConfig { private WorkerMetricsTableConfig workerMetricsTableConfig; /** - * Frequency to perform worker variance balancing. This value is used with respect to the failoverTimeMillis, - * that is every six (as default) * failoverTimeMillis the worker variance balancing will be performed. - * Setting it to 1 will make varianceBalancing run on every failoverTimeMillis and 2 on every 2 * failoverTimeMillis + * Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency, + * that is every sixth (as default) iteration of LAM the worker variance balancing will be performed. + * Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration * and so on. - * NOTE: LAM frequency = {@link LeaseManagementConfig#leaseAssignmentIntervalMillis} + * NOTE: LAM frequency = failoverTimeMillis */ private int varianceBalancingFrequency = 6; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 4835ca40..bc9e6620 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -434,7 +434,7 @@ class LeaseAssignmentManagerTest { } // no needed since variance based load balancing is no longer tied to LAM run - // @Test + @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); @@ -476,18 +476,9 @@ class LeaseAssignmentManagerTest { leaseRefresher.listLeases().stream() .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) .count()); - - setupConditionForVarianceBalancing(); - // 5th Run, expect no re-balance - leaseAssignmentManagerRunnable.run(); - assertEquals( - 1L, - leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count()); } - @Test + // @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() throws Exception {