Change logic to balance tied to lease duration

This commit is contained in:
eha sah 2025-04-10 13:26:16 -07:00
parent 90b1282d16
commit 6d8396d3f4
5 changed files with 52 additions and 16 deletions

View file

@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest {
.workerUtilizationAwareAssignmentConfig() .workerUtilizationAwareAssignmentConfig()
.staleWorkerMetricsEntryCleanupDuration()); .staleWorkerMetricsEntryCleanupDuration());
assertEquals( assertEquals(
3, 6,
kclV3Config kclV3Config
.leaseManagementConfig .leaseManagementConfig
.workerUtilizationAwareAssignmentConfig() .workerUtilizationAwareAssignmentConfig()

View file

@ -279,14 +279,16 @@ public final class LeaseAssignmentManager {
} }
private boolean shouldRunVarianceBalancing() { private boolean shouldRunVarianceBalancing() {
final boolean response = this.lamRunCounter == 0; final long currentNanosTime = nanoTimeProvider.get();
/* final long intervalMillis = leaseDurationMillis * config.varianceBalancingFrequency();
To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency).
If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when final long elapsedMillis = Math.abs(currentNanosTime - varianceBasedBalancingLastRunTime) / 1_000_000;
lamRunCounter is 0.
*/ if (elapsedMillis >= intervalMillis) {
this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency(); varianceBasedBalancingLastRunTime = currentNanosTime;
return response; return true;
}
return false;
} }
/** /**

View file

@ -585,7 +585,7 @@ public class LeaseManagementConfig {
* and so on. * and so on.
* NOTE: LAM frequency = failoverTimeMillis * NOTE: LAM frequency = failoverTimeMillis
*/ */
private int varianceBalancingFrequency = 3; private int varianceBalancingFrequency = 6;
/** /**
* Alpha value used for calculating exponential moving average of worker's metricStats. Selecting * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting

View file

@ -87,7 +87,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
private final LeaseDiscoverer leaseDiscoverer; private final LeaseDiscoverer leaseDiscoverer;
private final long renewerIntervalMillis; private final long renewerIntervalMillis;
private final long takerIntervalMillis; private final long takerIntervalMillis;
private final long leaseDiscovererIntervalMillis; private long leaseDiscovererIntervalMillis;
private final ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseRenewalThreadpool;
private final ExecutorService leaseDiscoveryThreadPool; private final ExecutorService leaseDiscoveryThreadPool;
private final LeaseRefresher leaseRefresher; private final LeaseRefresher leaseRefresher;
@ -143,8 +143,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory, final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap, final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
final long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher; this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
@ -154,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);
@ -192,6 +191,41 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
maxLeasesToStealAtOneTime); maxLeasesToStealAtOneTime);
} }
public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
this(
leaseRefresher,
workerIdentifier,
leaseDurationMillis,
enablePriorityLeaseAssignment,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewerThreadCount,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
metricsFactory,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap);
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2;
}
@RequiredArgsConstructor @RequiredArgsConstructor
private class LeaseDiscoveryRunnable implements Runnable { private class LeaseDiscoveryRunnable implements Runnable {
private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider; private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;

View file

@ -434,7 +434,7 @@ class LeaseAssignmentManagerTest {
} }
// no needed since variance based load balancing is no longer tied to LAM run // no needed since variance based load balancing is no longer tied to LAM run
@Test // @Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception {
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config =
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10);
@ -478,7 +478,7 @@ class LeaseAssignmentManagerTest {
.count()); .count());
} }
// @Test @Test
void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration()
throws Exception { throws Exception {