diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java index 070757ec..8a5c7c34 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java @@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest { .workerUtilizationAwareAssignmentConfig() .staleWorkerMetricsEntryCleanupDuration()); assertEquals( - 6, + 3, kclV3Config .leaseManagementConfig .workerUtilizationAwareAssignmentConfig() diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 5ace4160..162bda31 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -279,16 +279,14 @@ public final class LeaseAssignmentManager { } private boolean shouldRunVarianceBalancing() { - final long currentNanosTime = nanoTimeProvider.get(); - final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency(); - - final long elapsedMillis = Math.abs(currentNanosTime - varianceBasedBalancingLastRunTime) / 1_000_000; - - if (elapsedMillis >= intervalMillis) { - varianceBasedBalancingLastRunTime = currentNanosTime; - return true; - } - return false; + final boolean response = this.lamRunCounter == 0; + /* + To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency). + If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when + lamRunCounter is 0. + */ + this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency(); + return response; } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index d31f44ae..06104a9c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -585,7 +585,7 @@ public class LeaseManagementConfig { * and so on. * NOTE: LAM frequency = failoverTimeMillis */ - private int varianceBalancingFrequency = 6; + private int varianceBalancingFrequency = 3; /** * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 5fe4158e..7769dd36 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -434,7 +434,7 @@ class LeaseAssignmentManagerTest { } // no needed since variance based load balancing is no longer tied to LAM run - // @Test + @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); @@ -478,53 +478,6 @@ class LeaseAssignmentManagerTest { .count()); } - @Test - void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() - throws Exception { - - long multiplier = 1000_000; - final int varianceBalancingFrequency = 3; - final long leaseDurationMillis = Duration.ofMillis(1000).toMillis(); - - final IntervalTimeSupplier lamTimeSupplier = new IntervalTimeSupplier(3000 * multiplier); - final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = - getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); - config.varianceBalancingFrequency(varianceBalancingFrequency); - - createLeaseAssignmentManager(config, leaseDurationMillis, lamTimeSupplier, Integer.MAX_VALUE); - - // Run at time 0, variance balance must be done - setupConditionForVarianceBalancing(); - leaseAssignmentManagerRunnable.run(); - assertEquals( - 3L, - leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count()); - - lamTimeSupplier.incrementCurrentTime(1000 * multiplier); - - // Run at time 1, variance balance must not be done - setupConditionForVarianceBalancing(); - leaseAssignmentManagerRunnable.run(); - assertEquals( - 1L, - leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count()); - - lamTimeSupplier.incrementCurrentTime(2 * 1000 * multiplier); - - // Run at time 3, variance balance must be done - setupConditionForVarianceBalancing(); - leaseAssignmentManagerRunnable.run(); - assertEquals( - 3L, - leaseRefresher.listLeases().stream() - .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) - .count()); - } - private void setupConditionForVarianceBalancing() throws Exception { workerMetricsDAO.updateMetrics(createDummyYieldWorkerMetrics(TEST_YIELD_WORKER_ID + "1")); @@ -1373,21 +1326,4 @@ class LeaseAssignmentManagerTest { .build()) .join(); } - - class IntervalTimeSupplier implements Supplier { - private long currentTime; - - public IntervalTimeSupplier(long currentTime) { - this.currentTime = currentTime; - } - - @Override - public Long get() { - return currentTime; - } - - public void incrementCurrentTime(long currentTime) { - this.currentTime += currentTime; - } - } }