From d01ed3e5275434229904e9b22e07887e0173c7c5 Mon Sep 17 00:00:00 2001 From: eha sah Date: Fri, 11 Apr 2025 14:18:21 -0700 Subject: [PATCH] Code for backward compatibility --- .../amazon/kinesis/coordinator/Scheduler.java | 2 +- .../assignment/LeaseAssignmentManager.java | 65 ++++++- .../dynamodb/DynamoDBLeaseCoordinator.java | 5 +- .../DynamoDBLeaseManagementFactory.java | 178 ++++++++++++++---- .../LeaseAssignmentManagerTest.java | 9 +- 5 files changed, 206 insertions(+), 53 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 539d114f..9730793c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -419,7 +419,7 @@ public class Scheduler implements Runnable { leaseManagementConfig.failoverTimeMillis(), metricsFactory, lamThreadPool, - System::nanoTime, + () -> System.nanoTime(), leaseManagementConfig.maxLeasesForWorker(), leaseManagementConfig.gracefulLeaseHandoffConfig(), leaseManagementConfig.leaseAssignmentIntervalMillis())) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 162bda31..1535788d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -41,7 +41,6 @@ import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; @@ -71,21 +70,14 @@ import static java.util.Objects.nonNull; * In the end, performs actual assignment by writing to storage. */ @Slf4j -@RequiredArgsConstructor @KinesisClientInternalApi -public final class LeaseAssignmentManager { +public class LeaseAssignmentManager { /** * Default number of continuous failure execution after which leadership is released. */ private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3; - /** - * Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis). - * If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis. - */ - private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2; - private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease"; /** @@ -123,7 +115,60 @@ public final class LeaseAssignmentManager { private int noOfContinuousFailedAttempts = 0; private int lamRunCounter = 0; - private long varianceBasedBalancingLastRunTime; + + @Deprecated + public LeaseAssignmentManager( + LeaseRefresher leaseRefresher, + WorkerMetricStatsDAO workerMetricsDAO, + LeaderDecider leaderDecider, + LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, + String workerIdentifier, + Long leaseDurationMillis, + MetricsFactory metricsFactory, + ScheduledExecutorService executorService, + Supplier nanoTimeProvider, + int maxLeasesForWorker, + LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { + this.leaseRefresher = leaseRefresher; + this.workerMetricsDAO = workerMetricsDAO; + this.leaderDecider = leaderDecider; + this.config = config; + this.currentWorkerId = workerIdentifier; + this.leaseDurationMillis = leaseDurationMillis; + this.metricsFactory = metricsFactory; + this.executorService = executorService; + this.nanoTimeProvider = nanoTimeProvider; + this.maxLeasesForWorker = maxLeasesForWorker; + this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this.leaseAssignmentIntervalMillis = 2 * leaseDurationMillis; + } + + public LeaseAssignmentManager( + LeaseRefresher leaseRefresher, + WorkerMetricStatsDAO workerMetricsDAO, + LeaderDecider leaderDecider, + LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, + String workerIdentifier, + Long leaseDurationMillis, + MetricsFactory metricsFactory, + ScheduledExecutorService executorService, + Supplier nanoTimeProvider, + int maxLeasesForWorker, + LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, + long leaseAssignmentIntervalMillis) { + this.leaseRefresher = leaseRefresher; + this.workerMetricsDAO = workerMetricsDAO; + this.leaderDecider = leaderDecider; + this.config = config; + this.currentWorkerId = workerIdentifier; + this.leaseDurationMillis = leaseDurationMillis; + this.metricsFactory = metricsFactory; + this.executorService = executorService; + this.nanoTimeProvider = nanoTimeProvider; + this.maxLeasesForWorker = maxLeasesForWorker; + this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; + } public synchronized void start() { if (isNull(managerFuture)) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index cec84e86..18b6f927 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -129,6 +129,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { * @param metricsFactory * Used to publish metrics about lease operations */ + @Deprecated public DynamoDBLeaseCoordinator( final LeaseRefresher leaseRefresher, final String workerIdentifier, @@ -152,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; - // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. + // Should run once every leaseDurationMillis to identify new leases before expiry. this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( @@ -223,6 +224,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { workerUtilizationAwareAssignmentConfig, gracefulLeaseHandoffConfig, shardInfoShardConsumerMap); + + // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 45a766e7..46d038ba 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -108,7 +108,114 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final boolean isMultiStreamMode; private final LeaseCleanupConfig leaseCleanupConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; - private final long leaseAssignmentIntervalMillis; + private long leaseAssignmentIntervalMillis; + + /** + * Constructor. + * @param kinesisClient + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param failoverTimeMillis + * @param enablePriorityLeaseAssignment + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param leaseTableDeletionProtectionEnabled + * @param leaseTablePitrEnabled + * @param leaseSerializer + * @param customShardDetectorProvider + * @param isMultiStreamMode + * @param leaseCleanupConfig + * @param workerUtilizationAwareAssignmentConfig + * @param gracefulLeaseHandoffConfig + */ + public DynamoDBLeaseManagementFactory( + final @NotNull KinesisAsyncClient kinesisClient, + final @NotNull DynamoDbAsyncClient dynamoDBClient, + final @NotNull String tableName, + final @NotNull String workerIdentifier, + final @NotNull ExecutorService executorService, + final long failoverTimeMillis, + final boolean enablePriorityLeaseAssignment, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, + final boolean consistentReads, + final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, + final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, + final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, + final TableCreatorCallback tableCreatorCallback, + final Duration dynamoDbRequestTimeout, + final BillingMode billingMode, + final boolean leaseTableDeletionProtectionEnabled, + final boolean leaseTablePitrEnabled, + final Collection tags, + final @NotNull LeaseSerializer leaseSerializer, + final Function customShardDetectorProvider, + boolean isMultiStreamMode, + final LeaseCleanupConfig leaseCleanupConfig, + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { + this.kinesisClient = kinesisClient; + this.dynamoDBClient = dynamoDBClient; + this.tableName = tableName; + this.workerIdentifier = workerIdentifier; + this.executorService = executorService; + this.failoverTimeMillis = failoverTimeMillis; + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; + this.epsilonMillis = epsilonMillis; + this.maxLeasesForWorker = maxLeasesForWorker; + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.consistentReads = consistentReads; + this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; + this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; + this.leaseTablePitrEnabled = leaseTablePitrEnabled; + this.leaseSerializer = leaseSerializer; + this.customShardDetectorProvider = customShardDetectorProvider; + this.isMultiStreamMode = isMultiStreamMode; + this.leaseCleanupConfig = leaseCleanupConfig; + this.tags = tags; + this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; + this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + } /** * Constructor. @@ -183,40 +290,41 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, long leaseAssignmentIntervalMillis) { - this.kinesisClient = kinesisClient; - this.dynamoDBClient = dynamoDBClient; - this.tableName = tableName; - this.workerIdentifier = workerIdentifier; - this.executorService = executorService; - this.failoverTimeMillis = failoverTimeMillis; - this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; - this.epsilonMillis = epsilonMillis; - this.maxLeasesForWorker = maxLeasesForWorker; - this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; - this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; - this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; - this.shardSyncIntervalMillis = shardSyncIntervalMillis; - this.consistentReads = consistentReads; - this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; - this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; - this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; - this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; - this.cacheMissWarningModulus = cacheMissWarningModulus; - this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; - this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; - this.tableCreatorCallback = tableCreatorCallback; - this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; - this.billingMode = billingMode; - this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; - this.leaseTablePitrEnabled = leaseTablePitrEnabled; - this.leaseSerializer = leaseSerializer; - this.customShardDetectorProvider = customShardDetectorProvider; - this.isMultiStreamMode = isMultiStreamMode; - this.leaseCleanupConfig = leaseCleanupConfig; - this.tags = tags; - this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; - this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this( + kinesisClient, + dynamoDBClient, + tableName, + workerIdentifier, + executorService, + failoverTimeMillis, + enablePriorityLeaseAssignment, + epsilonMillis, + maxLeasesForWorker, + maxLeasesToStealAtOneTime, + maxLeaseRenewalThreads, + cleanupLeasesUponShardCompletion, + ignoreUnexpectedChildShards, + shardSyncIntervalMillis, + consistentReads, + listShardsBackoffTimeMillis, + maxListShardsRetryAttempts, + maxCacheMissesBeforeReload, + listShardsCacheAllowedAgeInSeconds, + cacheMissWarningModulus, + initialLeaseTableReadCapacity, + initialLeaseTableWriteCapacity, + tableCreatorCallback, + dynamoDbRequestTimeout, + billingMode, + leaseTableDeletionProtectionEnabled, + leaseTablePitrEnabled, + tags, + leaseSerializer, + customShardDetectorProvider, + isMultiStreamMode, + leaseCleanupConfig, + workerUtilizationAwareAssignmentConfig, + gracefulLeaseHandoffConfig); this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 7769dd36..bd3f00bb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -741,8 +741,7 @@ class LeaseAssignmentManagerTest { Integer.MAX_VALUE, LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() .isGracefulLeaseHandoffEnabled(false) - .build(), - 200L); + .build()); leaseAssignmentManager.start(); @@ -1146,8 +1145,7 @@ class LeaseAssignmentManagerTest { mockExecutor, System::nanoTime, Integer.MAX_VALUE, - gracefulLeaseHandoffConfig, - 2 * failoverTimeMillis); + gracefulLeaseHandoffConfig); leaseAssignmentManager.start(); @@ -1203,8 +1201,7 @@ class LeaseAssignmentManagerTest { scheduledExecutorService, nanoTimeProvider, maxLeasesPerWorker, - gracefulLeaseHandoffConfig, - 2 * leaseDurationMillis); + gracefulLeaseHandoffConfig); leaseAssignmentManager.start(); return leaseAssignmentManager; }