diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 1535788d..5708ac13 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -80,6 +80,12 @@ public class LeaseAssignmentManager { private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; + /** + * Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis). + * If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis. + */ + private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; + /** * Default retry attempt for loading leases and workers before giving up. */ @@ -140,7 +146,7 @@ public class LeaseAssignmentManager { this.nanoTimeProvider = nanoTimeProvider; this.maxLeasesForWorker = maxLeasesForWorker; this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; - this.leaseAssignmentIntervalMillis = 2 * leaseDurationMillis; + this.leaseAssignmentIntervalMillis = leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER; } public LeaseAssignmentManager( diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 46d038ba..30f38f36 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -146,6 +146,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param workerUtilizationAwareAssignmentConfig * @param gracefulLeaseHandoffConfig */ + @Deprecated public DynamoDBLeaseManagementFactory( final @NotNull KinesisAsyncClient kinesisClient, final @NotNull DynamoDbAsyncClient dynamoDBClient, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index bd3f00bb..7a2afa27 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -433,7 +433,6 @@ class LeaseAssignmentManagerTest { .anyMatch(lease -> lease.leaseOwner().equals(TEST_YIELD_WORKER_ID + "2"))); } - // no needed since variance based load balancing is no longer tied to LAM run @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = @@ -476,6 +475,15 @@ class LeaseAssignmentManagerTest { leaseRefresher.listLeases().stream() .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) .count()); + + setupConditionForVarianceBalancing(); + // 5th Run, expect no re-balance + leaseAssignmentManagerRunnable.run(); + assertEquals( + 1L, + leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count()); } private void setupConditionForVarianceBalancing() throws Exception {