Make LAM run at configurable interval (#1464)

* Make LAM run at configurable interval, tie LeaseDiscoverer with LAM run and tie load balancing with leaseDuration

* remove unwanted SampleApp class

* remove unwanted RecordProcessor and added new parameter class

* Updated comment removed one variable for consistency

* Updated test logic to use Supplier to provide time

* updated logic to count based variance balancing

* Changed variance based balancing to 12

* Changed variance based balancing to 3

* Change logic to balance tied to lease duration

* Change logic to balance tied to LAM run

* Code for backward compatibility

* Code for backward compatibility check

* Code for backward compatibility check

* Best practice to deprecate old constructor

* Best practice to deprecate old constructor

* Best practice to deprecate old constructor

* removed backward compatibility code/constructors

* Formating and remove unused variable

* added formating to avoid build failure
This commit is contained in:
ehasah-aws 2025-04-17 10:16:12 -07:00 committed by GitHub
parent d7dd21beca
commit 63911b53a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 96 additions and 21 deletions

View file

@ -421,7 +421,8 @@ public class Scheduler implements Runnable {
lamThreadPool, lamThreadPool,
System::nanoTime, System::nanoTime,
leaseManagementConfig.maxLeasesForWorker(), leaseManagementConfig.maxLeasesForWorker(),
leaseManagementConfig.gracefulLeaseHandoffConfig())) leaseManagementConfig.gracefulLeaseHandoffConfig(),
leaseManagementConfig.leaseAssignmentIntervalMillis()))
.adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory)) .adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory))
.deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider( .deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider(
leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory)) leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory))

View file

@ -80,12 +80,6 @@ public final class LeaseAssignmentManager {
*/ */
private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3; private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3;
/**
* Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis).
* If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis.
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/** /**
@ -117,6 +111,7 @@ public final class LeaseAssignmentManager {
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private boolean tookOverLeadershipInThisRun = false; private boolean tookOverLeadershipInThisRun = false;
private final Map<String, Lease> prevRunLeasesState = new HashMap<>(); private final Map<String, Lease> prevRunLeasesState = new HashMap<>();
private final long leaseAssignmentIntervalMillis;
private Future<?> managerFuture; private Future<?> managerFuture;
@ -129,10 +124,7 @@ public final class LeaseAssignmentManager {
// so reset the flag to refresh the state before processing during a restart of LAM. // so reset the flag to refresh the state before processing during a restart of LAM.
tookOverLeadershipInThisRun = false; tookOverLeadershipInThisRun = false;
managerFuture = executorService.scheduleWithFixedDelay( managerFuture = executorService.scheduleWithFixedDelay(
this::performAssignment, this::performAssignment, 0L, (int) (leaseAssignmentIntervalMillis), TimeUnit.MILLISECONDS);
0L,
leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER,
TimeUnit.MILLISECONDS);
log.info("Started LeaseAssignmentManager"); log.info("Started LeaseAssignmentManager");
return; return;
} }

View file

@ -114,6 +114,16 @@ public class LeaseManagementConfig {
*/ */
private long failoverTimeMillis = 10000L; private long failoverTimeMillis = 10000L;
/**
* Lease assignment interval in milliseconds - e.g. wait for this long between Lease assignment run.
*
* <p>Default value: 2 * {@link LeaseManagementConfig#failoverTimeMillis}</p>
*/
private Long leaseAssignmentIntervalMillis;
public long leaseAssignmentIntervalMillis() {
return leaseAssignmentIntervalMillis != null ? leaseAssignmentIntervalMillis : 2 * failoverTimeMillis;
}
/** /**
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
@ -489,7 +499,8 @@ public class LeaseManagementConfig {
isMultiStreamingMode, isMultiStreamingMode,
leaseCleanupConfig(), leaseCleanupConfig(),
workerUtilizationAwareAssignmentConfig(), workerUtilizationAwareAssignmentConfig(),
gracefulLeaseHandoffConfig); gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis());
} }
return leaseManagementFactory; return leaseManagementFactory;
} }

View file

@ -128,6 +128,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* Initial dynamodb lease table write iops if creating the lease table * Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory * @param metricsFactory
* Used to publish metrics about lease operations * Used to publish metrics about lease operations
* @param leaseAssignmentIntervalMillis
* Interval at which Lease assignment manager runs
*/ */
public DynamoDBLeaseCoordinator( public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher, final LeaseRefresher leaseRefresher,
@ -143,7 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory, final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) { final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher; this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
@ -152,8 +155,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run once every leaseDurationMillis to identify new leases before expiry. // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis / 2) - epsilonMillis;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);

View file

@ -108,6 +108,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final boolean isMultiStreamMode; private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig; private final LeaseCleanupConfig leaseCleanupConfig;
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private long leaseAssignmentIntervalMillis;
/** /**
* Constructor. * Constructor.
@ -144,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseCleanupConfig * @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig * @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig * @param gracefulLeaseHandoffConfig
* @param leaseAssignmentIntervalMillis
*/ */
public DynamoDBLeaseManagementFactory( public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient, final @NotNull KinesisAsyncClient kinesisClient,
@ -179,7 +181,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
boolean isMultiStreamMode, boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig, final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final long leaseAssignmentIntervalMillis) {
this.kinesisClient = kinesisClient; this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient; this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName; this.tableName = tableName;
@ -214,6 +217,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tags = tags; this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
} }
@Override @Override
@ -239,7 +243,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory, metricsFactory,
workerUtilizationAwareAssignmentConfig, workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig, gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap); shardInfoShardConsumerMap,
leaseAssignmentIntervalMillis);
} }
/** /**

View file

@ -17,6 +17,8 @@ import lombok.var;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito; import org.mockito.Mockito;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema; import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
@ -49,6 +51,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -746,7 +749,8 @@ class LeaseAssignmentManagerTest {
Integer.MAX_VALUE, Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false) .isGracefulLeaseHandoffEnabled(false)
.build()); .build(),
2 * 100L);
leaseAssignmentManager.start(); leaseAssignmentManager.start();
@ -1134,6 +1138,62 @@ class LeaseAssignmentManagerTest {
dynamoDbAsyncClient.putItem(putItemRequest); dynamoDbAsyncClient.putItem(putItemRequest);
} }
@Test
void testLeaseAssignmentSchedulingWithDefaultInterval() {
long failoverTimeMillis = 1000L;
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
2 * failoverTimeMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(2 * failoverTimeMillis), eq(TimeUnit.MILLISECONDS));
}
@ParameterizedTest
@CsvSource({
"1000, 500", // leaseAssignmentInterval smaller than failover
"1000, 1000", // leaseAssignmentInterval equal to failover
"1000, 2000", // leaseAssignmentInterval larger than failover
})
void testLeaseAssignmentWithDifferentIntervals(long failoverTimeMillis, long leaseAssignmentIntervalMillis) {
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(leaseAssignmentIntervalMillis), eq(TimeUnit.MILLISECONDS));
}
private LeaseAssignmentManager createLeaseAssignmentManager( private LeaseAssignmentManager createLeaseAssignmentManager(
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config,
final Long leaseDurationMillis, final Long leaseDurationMillis,
@ -1151,7 +1211,8 @@ class LeaseAssignmentManagerTest {
scheduledExecutorService, scheduledExecutorService,
nanoTimeProvider, nanoTimeProvider,
maxLeasesPerWorker, maxLeasesPerWorker,
gracefulLeaseHandoffConfig); gracefulLeaseHandoffConfig,
2 * leaseDurationMillis);
leaseAssignmentManager.start(); leaseAssignmentManager.start();
return leaseAssignmentManager; return leaseAssignmentManager;
} }

View file

@ -132,7 +132,8 @@ public class LeaseCoordinatorExerciser {
metricsFactory, metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>()); new ConcurrentHashMap<>(),
2 * leaseDurationMillis);
coordinators.add(coord); coordinators.add(coord);
} }

View file

@ -128,7 +128,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
metricsFactory, metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>()); new ConcurrentHashMap<>(),
2 * LEASE_DURATION_MILLIS);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION); dynamoDBCheckpointer.operation(OPERATION);