Code for backward compatibility

This commit is contained in:
eha sah 2025-04-11 14:18:21 -07:00
parent 6520d3e739
commit d01ed3e527
5 changed files with 206 additions and 53 deletions

View file

@ -419,7 +419,7 @@ public class Scheduler implements Runnable {
leaseManagementConfig.failoverTimeMillis(),
metricsFactory,
lamThreadPool,
System::nanoTime,
() -> System.nanoTime(),
leaseManagementConfig.maxLeasesForWorker(),
leaseManagementConfig.gracefulLeaseHandoffConfig(),
leaseManagementConfig.leaseAssignmentIntervalMillis()))

View file

@ -41,7 +41,6 @@ import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
@ -71,21 +70,14 @@ import static java.util.Objects.nonNull;
* In the end, performs actual assignment by writing to storage.
*/
@Slf4j
@RequiredArgsConstructor
@KinesisClientInternalApi
public final class LeaseAssignmentManager {
public class LeaseAssignmentManager {
/**
* Default number of continuous failure execution after which leadership is released.
*/
private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3;
/**
* Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis).
* If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis.
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/**
@ -123,7 +115,60 @@ public final class LeaseAssignmentManager {
private int noOfContinuousFailedAttempts = 0;
private int lamRunCounter = 0;
private long varianceBasedBalancingLastRunTime;
@Deprecated
public LeaseAssignmentManager(
LeaseRefresher leaseRefresher,
WorkerMetricStatsDAO workerMetricsDAO,
LeaderDecider leaderDecider,
LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config,
String workerIdentifier,
Long leaseDurationMillis,
MetricsFactory metricsFactory,
ScheduledExecutorService executorService,
Supplier<Long> nanoTimeProvider,
int maxLeasesForWorker,
LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this.leaseRefresher = leaseRefresher;
this.workerMetricsDAO = workerMetricsDAO;
this.leaderDecider = leaderDecider;
this.config = config;
this.currentWorkerId = workerIdentifier;
this.leaseDurationMillis = leaseDurationMillis;
this.metricsFactory = metricsFactory;
this.executorService = executorService;
this.nanoTimeProvider = nanoTimeProvider;
this.maxLeasesForWorker = maxLeasesForWorker;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = 2 * leaseDurationMillis;
}
public LeaseAssignmentManager(
LeaseRefresher leaseRefresher,
WorkerMetricStatsDAO workerMetricsDAO,
LeaderDecider leaderDecider,
LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config,
String workerIdentifier,
Long leaseDurationMillis,
MetricsFactory metricsFactory,
ScheduledExecutorService executorService,
Supplier<Long> nanoTimeProvider,
int maxLeasesForWorker,
LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher;
this.workerMetricsDAO = workerMetricsDAO;
this.leaderDecider = leaderDecider;
this.config = config;
this.currentWorkerId = workerIdentifier;
this.leaseDurationMillis = leaseDurationMillis;
this.metricsFactory = metricsFactory;
this.executorService = executorService;
this.nanoTimeProvider = nanoTimeProvider;
this.maxLeasesForWorker = maxLeasesForWorker;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
}
public synchronized void start() {
if (isNull(managerFuture)) {

View file

@ -129,6 +129,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* @param metricsFactory
* Used to publish metrics about lease operations
*/
@Deprecated
public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher,
final String workerIdentifier,
@ -152,7 +153,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
// Should run once every leaseDurationMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
@ -223,6 +224,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap);
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2;
}

View file

@ -108,7 +108,114 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private final long leaseAssignmentIntervalMillis;
private long leaseAssignmentIntervalMillis;
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param enablePriorityLeaseAssignment
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
* @param leaseTablePitrEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
* @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig
*/
public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient,
final @NotNull DynamoDbAsyncClient dynamoDBClient,
final @NotNull String tableName,
final @NotNull String workerIdentifier,
final @NotNull ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final TableCreatorCallback tableCreatorCallback,
final Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection<Tag> tags,
final @NotNull LeaseSerializer leaseSerializer,
final Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.consistentReads = consistentReads;
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
}
/**
* Constructor.
@ -183,40 +290,41 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
long leaseAssignmentIntervalMillis) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.consistentReads = consistentReads;
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this(
kinesisClient,
dynamoDBClient,
tableName,
workerIdentifier,
executorService,
failoverTimeMillis,
enablePriorityLeaseAssignment,
epsilonMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
consistentReads,
listShardsBackoffTimeMillis,
maxListShardsRetryAttempts,
maxCacheMissesBeforeReload,
listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus,
initialLeaseTableReadCapacity,
initialLeaseTableWriteCapacity,
tableCreatorCallback,
dynamoDbRequestTimeout,
billingMode,
leaseTableDeletionProtectionEnabled,
leaseTablePitrEnabled,
tags,
leaseSerializer,
customShardDetectorProvider,
isMultiStreamMode,
leaseCleanupConfig,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig);
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
}

View file

@ -741,8 +741,7 @@ class LeaseAssignmentManagerTest {
Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false)
.build(),
200L);
.build());
leaseAssignmentManager.start();
@ -1146,8 +1145,7 @@ class LeaseAssignmentManagerTest {
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
2 * failoverTimeMillis);
gracefulLeaseHandoffConfig);
leaseAssignmentManager.start();
@ -1203,8 +1201,7 @@ class LeaseAssignmentManagerTest {
scheduledExecutorService,
nanoTimeProvider,
maxLeasesPerWorker,
gracefulLeaseHandoffConfig,
2 * leaseDurationMillis);
gracefulLeaseHandoffConfig);
leaseAssignmentManager.start();
return leaseAssignmentManager;
}