diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 9730793c..539d114f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -419,7 +419,7 @@ public class Scheduler implements Runnable { leaseManagementConfig.failoverTimeMillis(), metricsFactory, lamThreadPool, - () -> System.nanoTime(), + System::nanoTime, leaseManagementConfig.maxLeasesForWorker(), leaseManagementConfig.gracefulLeaseHandoffConfig(), leaseManagementConfig.leaseAssignmentIntervalMillis())) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index 4da06626..de5d8449 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -122,46 +122,19 @@ public final class LeaseAssignmentManager { private int noOfContinuousFailedAttempts = 0; private int lamRunCounter = 0; - @Deprecated public LeaseAssignmentManager( - LeaseRefresher leaseRefresher, - WorkerMetricStatsDAO workerMetricsDAO, - LeaderDecider leaderDecider, - LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, - String workerIdentifier, - Long leaseDurationMillis, - MetricsFactory metricsFactory, - ScheduledExecutorService executorService, - Supplier nanoTimeProvider, - int maxLeasesForWorker, - LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { - this.leaseRefresher = leaseRefresher; - this.workerMetricsDAO = workerMetricsDAO; - this.leaderDecider = leaderDecider; - this.config = config; - this.currentWorkerId = workerIdentifier; - this.leaseDurationMillis = leaseDurationMillis; - this.metricsFactory = metricsFactory; - this.executorService = executorService; - this.nanoTimeProvider = nanoTimeProvider; - this.maxLeasesForWorker = maxLeasesForWorker; - this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; - this.leaseAssignmentIntervalMillis = leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER; - } - - public LeaseAssignmentManager( - LeaseRefresher leaseRefresher, - WorkerMetricStatsDAO workerMetricsDAO, - LeaderDecider leaderDecider, - LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, - String workerIdentifier, - Long leaseDurationMillis, - MetricsFactory metricsFactory, - ScheduledExecutorService executorService, - Supplier nanoTimeProvider, - int maxLeasesForWorker, - LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - long leaseAssignmentIntervalMillis) { + final LeaseRefresher leaseRefresher, + final WorkerMetricStatsDAO workerMetricsDAO, + final LeaderDecider leaderDecider, + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, + final String workerIdentifier, + final Long leaseDurationMillis, + final MetricsFactory metricsFactory, + final ScheduledExecutorService executorService, + final Supplier nanoTimeProvider, + final int maxLeasesForWorker, + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, + final long leaseAssignmentIntervalMillis) { this.leaseRefresher = leaseRefresher; this.workerMetricsDAO = workerMetricsDAO; this.leaderDecider = leaderDecider; @@ -176,6 +149,34 @@ public final class LeaseAssignmentManager { this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; } + @Deprecated + public LeaseAssignmentManager( + final LeaseRefresher leaseRefresher, + final WorkerMetricStatsDAO workerMetricsDAO, + final LeaderDecider leaderDecider, + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, + final String workerIdentifier, + final Long leaseDurationMillis, + final MetricsFactory metricsFactory, + final ScheduledExecutorService executorService, + final Supplier nanoTimeProvider, + final int maxLeasesForWorker, + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { + this( + leaseRefresher, + workerMetricsDAO, + leaderDecider, + config, + workerIdentifier, + leaseDurationMillis, + metricsFactory, + executorService, + nanoTimeProvider, + maxLeasesForWorker, + gracefulLeaseHandoffConfig, + leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER); + } + public synchronized void start() { if (isNull(managerFuture)) { // LAM can be dynamically started/stopped and restarted during MigrationStateMachine execution