From 63911b53a007941a7a81490db0f0f383d506bb1c Mon Sep 17 00:00:00 2001 From: ehasah-aws Date: Thu, 17 Apr 2025 10:16:12 -0700 Subject: [PATCH] Make LAM run at configurable interval (#1464) * Make LAM run at configurable interval, tie LeaseDiscoverer with LAM run and tie load balancing with leaseDuration * remove unwanted SampleApp class * remove unwanted RecordProcessor and added new parameter class * Updated comment removed one variable for consistency * Updated test logic to use Supplier to provide time * updated logic to count based variance balancing * Changed variance based balancing to 12 * Changed variance based balancing to 3 * Change logic to balance tied to lease duration * Change logic to balance tied to LAM run * Code for backward compatibility * Code for backward compatibility check * Code for backward compatibility check * Best practice to deprecate old constructor * Best practice to deprecate old constructor * Best practice to deprecate old constructor * removed backward compatibility code/constructors * Formating and remove unused variable * added formating to avoid build failure --- .../amazon/kinesis/coordinator/Scheduler.java | 3 +- .../assignment/LeaseAssignmentManager.java | 12 +--- .../kinesis/leases/LeaseManagementConfig.java | 13 +++- .../dynamodb/DynamoDBLeaseCoordinator.java | 9 ++- .../DynamoDBLeaseManagementFactory.java | 9 ++- .../LeaseAssignmentManagerTest.java | 65 ++++++++++++++++++- .../leases/LeaseCoordinatorExerciser.java | 3 +- ...namoDBLeaseCoordinatorIntegrationTest.java | 3 +- 8 files changed, 96 insertions(+), 21 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 73e1ed32..e382f037 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -421,7 +421,8 @@ public class Scheduler implements Runnable { lamThreadPool, System::nanoTime, leaseManagementConfig.maxLeasesForWorker(), - leaseManagementConfig.gracefulLeaseHandoffConfig())) + leaseManagementConfig.gracefulLeaseHandoffConfig(), + leaseManagementConfig.leaseAssignmentIntervalMillis())) .adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory)) .deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider( leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory)) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index f5ea5470..b9fdb853 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -80,12 +80,6 @@ public final class LeaseAssignmentManager { */ private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3; - /** - * Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis). - * If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis. - */ - private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; - private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; /** @@ -117,6 +111,7 @@ public final class LeaseAssignmentManager { private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; private boolean tookOverLeadershipInThisRun = false; private final Map prevRunLeasesState = new HashMap<>(); + private final long leaseAssignmentIntervalMillis; private Future managerFuture; @@ -129,10 +124,7 @@ public final class LeaseAssignmentManager { // so reset the flag to refresh the state before processing during a restart of LAM. tookOverLeadershipInThisRun = false; managerFuture = executorService.scheduleWithFixedDelay( - this::performAssignment, - 0L, - leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER, - TimeUnit.MILLISECONDS); + this::performAssignment, 0L, (int) (leaseAssignmentIntervalMillis), TimeUnit.MILLISECONDS); log.info("Started LeaseAssignmentManager"); return; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 1839b494..06104a9c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -114,6 +114,16 @@ public class LeaseManagementConfig { */ private long failoverTimeMillis = 10000L; + /** + * Lease assignment interval in milliseconds - e.g. wait for this long between Lease assignment run. + * + *

Default value: 2 * {@link LeaseManagementConfig#failoverTimeMillis}

+ */ + private Long leaseAssignmentIntervalMillis; + + public long leaseAssignmentIntervalMillis() { + return leaseAssignmentIntervalMillis != null ? leaseAssignmentIntervalMillis : 2 * failoverTimeMillis; + } /** * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at @@ -489,7 +499,8 @@ public class LeaseManagementConfig { isMultiStreamingMode, leaseCleanupConfig(), workerUtilizationAwareAssignmentConfig(), - gracefulLeaseHandoffConfig); + gracefulLeaseHandoffConfig, + leaseAssignmentIntervalMillis()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 4f4d7886..ce85b956 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -128,6 +128,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { * Initial dynamodb lease table write iops if creating the lease table * @param metricsFactory * Used to publish metrics about lease operations + * @param leaseAssignmentIntervalMillis + * Interval at which Lease assignment manager runs */ public DynamoDBLeaseCoordinator( final LeaseRefresher leaseRefresher, @@ -143,7 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final MetricsFactory metricsFactory, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - final ConcurrentMap shardInfoShardConsumerMap) { + final ConcurrentMap shardInfoShardConsumerMap, + final long leaseAssignmentIntervalMillis) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) @@ -152,8 +155,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; - // Should run once every leaseDurationMillis to identify new leases before expiry. - this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; + // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. + this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis / 2) - epsilonMillis; this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 7d902afd..4351755d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -108,6 +108,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final boolean isMultiStreamMode; private final LeaseCleanupConfig leaseCleanupConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; + private long leaseAssignmentIntervalMillis; /** * Constructor. @@ -144,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param leaseCleanupConfig * @param workerUtilizationAwareAssignmentConfig * @param gracefulLeaseHandoffConfig + * @param leaseAssignmentIntervalMillis */ public DynamoDBLeaseManagementFactory( final @NotNull KinesisAsyncClient kinesisClient, @@ -179,7 +181,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { boolean isMultiStreamMode, final LeaseCleanupConfig leaseCleanupConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, - final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, + final long leaseAssignmentIntervalMillis) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -214,6 +217,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.tags = tags; this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; } @Override @@ -239,7 +243,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { metricsFactory, workerUtilizationAwareAssignmentConfig, gracefulLeaseHandoffConfig, - shardInfoShardConsumerMap); + shardInfoShardConsumerMap, + leaseAssignmentIntervalMillis); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 6bc3a581..2280195d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -17,6 +17,8 @@ import lombok.var; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; @@ -49,6 +51,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -746,7 +749,8 @@ class LeaseAssignmentManagerTest { Integer.MAX_VALUE, LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() .isGracefulLeaseHandoffEnabled(false) - .build()); + .build(), + 2 * 100L); leaseAssignmentManager.start(); @@ -1134,6 +1138,62 @@ class LeaseAssignmentManagerTest { dynamoDbAsyncClient.putItem(putItemRequest); } + @Test + void testLeaseAssignmentSchedulingWithDefaultInterval() { + long failoverTimeMillis = 1000L; + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + + LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( + leaseRefresher, + workerMetricsDAO, + mockLeaderDecider, + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), + TEST_LEADER_WORKER_ID, + failoverTimeMillis, + new NullMetricsFactory(), + mockExecutor, + System::nanoTime, + Integer.MAX_VALUE, + gracefulLeaseHandoffConfig, + 2 * failoverTimeMillis); + + leaseAssignmentManager.start(); + + verify(mockExecutor) + .scheduleWithFixedDelay( + any(Runnable.class), eq(0L), eq(2 * failoverTimeMillis), eq(TimeUnit.MILLISECONDS)); + } + + @ParameterizedTest + @CsvSource({ + "1000, 500", // leaseAssignmentInterval smaller than failover + "1000, 1000", // leaseAssignmentInterval equal to failover + "1000, 2000", // leaseAssignmentInterval larger than failover + }) + void testLeaseAssignmentWithDifferentIntervals(long failoverTimeMillis, long leaseAssignmentIntervalMillis) { + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + + LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( + leaseRefresher, + workerMetricsDAO, + mockLeaderDecider, + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), + TEST_LEADER_WORKER_ID, + failoverTimeMillis, + new NullMetricsFactory(), + mockExecutor, + System::nanoTime, + Integer.MAX_VALUE, + gracefulLeaseHandoffConfig, + leaseAssignmentIntervalMillis); + + leaseAssignmentManager.start(); + + verify(mockExecutor) + .scheduleWithFixedDelay( + any(Runnable.class), eq(0L), eq(leaseAssignmentIntervalMillis), eq(TimeUnit.MILLISECONDS)); + } + private LeaseAssignmentManager createLeaseAssignmentManager( final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, final Long leaseDurationMillis, @@ -1151,7 +1211,8 @@ class LeaseAssignmentManagerTest { scheduledExecutorService, nanoTimeProvider, maxLeasesPerWorker, - gracefulLeaseHandoffConfig); + gracefulLeaseHandoffConfig, + 2 * leaseDurationMillis); leaseAssignmentManager.start(); return leaseAssignmentManager; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 26eb0d3f..e5e70d71 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -132,7 +132,8 @@ public class LeaseCoordinatorExerciser { metricsFactory, new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), - new ConcurrentHashMap<>()); + new ConcurrentHashMap<>(), + 2 * leaseDurationMillis); coordinators.add(coord); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 5694b03a..b90f3a5d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -128,7 +128,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { metricsFactory, new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), - new ConcurrentHashMap<>()); + new ConcurrentHashMap<>(), + 2 * LEASE_DURATION_MILLIS); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer.operation(OPERATION);