diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java index 8a5c7c34..070757ec 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java @@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest { .workerUtilizationAwareAssignmentConfig() .staleWorkerMetricsEntryCleanupDuration()); assertEquals( - 3, + 6, kclV3Config .leaseManagementConfig .workerUtilizationAwareAssignmentConfig() diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 162bda31..5ace4160 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -279,14 +279,16 @@ public final class LeaseAssignmentManager { } private boolean shouldRunVarianceBalancing() { - final boolean response = this.lamRunCounter == 0; - /* - To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency). - If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when - lamRunCounter is 0. - */ - this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency(); - return response; + final long currentNanosTime = nanoTimeProvider.get(); + final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency(); + + final long elapsedMillis = Math.abs(currentNanosTime - varianceBasedBalancingLastRunTime) / 1_000_000; + + if (elapsedMillis >= intervalMillis) { + varianceBasedBalancingLastRunTime = currentNanosTime; + return true; + } + return false; } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 06104a9c..d31f44ae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -585,7 +585,7 @@ public class LeaseManagementConfig { * and so on. * NOTE: LAM frequency = failoverTimeMillis */ - private int varianceBalancingFrequency = 3; + private int varianceBalancingFrequency = 6; /** * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 7090f81e..cec84e86 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -87,7 +87,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final LeaseDiscoverer leaseDiscoverer; private final long renewerIntervalMillis; private final long takerIntervalMillis; - private final long leaseDiscovererIntervalMillis; + private long leaseDiscovererIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseDiscoveryThreadPool; private final LeaseRefresher leaseRefresher; @@ -143,8 +143,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final MetricsFactory metricsFactory, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - final ConcurrentMap shardInfoShardConsumerMap, - final long leaseAssignmentIntervalMillis) { + final ConcurrentMap shardInfoShardConsumerMap) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) @@ -154,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. - this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; + this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); @@ -192,6 +191,41 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { maxLeasesToStealAtOneTime); } + public DynamoDBLeaseCoordinator( + final LeaseRefresher leaseRefresher, + final String workerIdentifier, + final long leaseDurationMillis, + final boolean enablePriorityLeaseAssignment, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewerThreadCount, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, + final MetricsFactory metricsFactory, + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, + final ConcurrentMap shardInfoShardConsumerMap, + final long leaseAssignmentIntervalMillis) { + + this( + leaseRefresher, + workerIdentifier, + leaseDurationMillis, + enablePriorityLeaseAssignment, + epsilonMillis, + maxLeasesForWorker, + maxLeasesToStealAtOneTime, + maxLeaseRenewerThreadCount, + initialLeaseTableReadCapacity, + initialLeaseTableWriteCapacity, + metricsFactory, + workerUtilizationAwareAssignmentConfig, + gracefulLeaseHandoffConfig, + shardInfoShardConsumerMap); + this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; + } + @RequiredArgsConstructor private class LeaseDiscoveryRunnable implements Runnable { private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 5ebd33d8..5fe4158e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -434,7 +434,7 @@ class LeaseAssignmentManagerTest { } // no needed since variance based load balancing is no longer tied to LAM run - @Test + // @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); @@ -478,7 +478,7 @@ class LeaseAssignmentManagerTest { .count()); } - // @Test + @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() throws Exception {