Best practice to deprecate old constructor

This commit is contained in:
eha sah 2025-04-11 16:07:17 -07:00
parent 443b41690b
commit 94cda6bd79
2 changed files with 123 additions and 122 deletions

View file

@ -87,7 +87,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
private final LeaseDiscoverer leaseDiscoverer;
private final long renewerIntervalMillis;
private final long takerIntervalMillis;
private long leaseDiscovererIntervalMillis;
private final long leaseDiscovererIntervalMillis;
private final ExecutorService leaseRenewalThreadpool;
private final ExecutorService leaseDiscoveryThreadPool;
private final LeaseRefresher leaseRefresher;
@ -128,8 +128,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory
* Used to publish metrics about lease operations
* @param leaseAssignmentIntervalMillis
* Interval at which Lease assignment manager runs
*/
@Deprecated
public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher,
final String workerIdentifier,
@ -144,7 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
@ -153,8 +155,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run once every leaseDurationMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis / 2) - epsilonMillis;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);
@ -192,6 +194,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
maxLeasesToStealAtOneTime);
}
@Deprecated
public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher,
final String workerIdentifier,
@ -206,8 +209,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
this(
leaseRefresher,
@ -223,10 +225,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
metricsFactory,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap);
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2;
shardInfoShardConsumerMap,
2 * leaseDurationMillis);
}
@RequiredArgsConstructor

View file

@ -110,114 +110,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private long leaseAssignmentIntervalMillis;
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param enablePriorityLeaseAssignment
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
* @param leaseTablePitrEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
* @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig
*/
@Deprecated
public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient,
final @NotNull DynamoDbAsyncClient dynamoDBClient,
final @NotNull String tableName,
final @NotNull String workerIdentifier,
final @NotNull ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final TableCreatorCallback tableCreatorCallback,
final Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection<Tag> tags,
final @NotNull LeaseSerializer leaseSerializer,
final Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.consistentReads = consistentReads;
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
}
/**
* Constructor.
* @param kinesisClient
@ -290,7 +182,116 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
long leaseAssignmentIntervalMillis) {
final long leaseAssignmentIntervalMillis) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
this.workerIdentifier = workerIdentifier;
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment;
this.epsilonMillis = epsilonMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.consistentReads = consistentReads;
this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled;
this.leaseTablePitrEnabled = leaseTablePitrEnabled;
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
}
/**
* Constructor.
* @param kinesisClient
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param failoverTimeMillis
* @param enablePriorityLeaseAssignment
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
* @param leaseTableDeletionProtectionEnabled
* @param leaseTablePitrEnabled
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
* @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig
*/
@Deprecated
public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient,
final @NotNull DynamoDbAsyncClient dynamoDBClient,
final @NotNull String tableName,
final @NotNull String workerIdentifier,
final @NotNull ExecutorService executorService,
final long failoverTimeMillis,
final boolean enablePriorityLeaseAssignment,
final long epsilonMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis,
final boolean consistentReads,
final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts,
final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds,
final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
final TableCreatorCallback tableCreatorCallback,
final Duration dynamoDbRequestTimeout,
final BillingMode billingMode,
final boolean leaseTableDeletionProtectionEnabled,
final boolean leaseTablePitrEnabled,
final Collection<Tag> tags,
final @NotNull LeaseSerializer leaseSerializer,
final Function<StreamConfig, ShardDetector> customShardDetectorProvider,
boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
this(
kinesisClient,
dynamoDBClient,
@ -325,8 +326,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
isMultiStreamMode,
leaseCleanupConfig,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig);
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
gracefulLeaseHandoffConfig,
2 * failoverTimeMillis);
}
@Override