diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 18b6f927..428240b0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -87,7 +87,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { private final LeaseDiscoverer leaseDiscoverer; private final long renewerIntervalMillis; private final long takerIntervalMillis; - private long leaseDiscovererIntervalMillis; + private final long leaseDiscovererIntervalMillis; private final ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseDiscoveryThreadPool; private final LeaseRefresher leaseRefresher; @@ -128,8 +128,9 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { * Initial dynamodb lease table write iops if creating the lease table * @param metricsFactory * Used to publish metrics about lease operations + * @param leaseAssignmentIntervalMillis + * Interval at which Lease assignment manager runs */ - @Deprecated public DynamoDBLeaseCoordinator( final LeaseRefresher leaseRefresher, final String workerIdentifier, @@ -144,7 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final MetricsFactory metricsFactory, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - final ConcurrentMap shardInfoShardConsumerMap) { + final ConcurrentMap shardInfoShardConsumerMap, + final long leaseAssignmentIntervalMillis) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) @@ -153,8 +155,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; - // Should run once every leaseDurationMillis to identify new leases before expiry. - this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; + // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. + this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis / 2) - epsilonMillis; this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); @@ -192,6 +194,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { maxLeasesToStealAtOneTime); } + @Deprecated public DynamoDBLeaseCoordinator( final LeaseRefresher leaseRefresher, final String workerIdentifier, @@ -206,8 +209,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final MetricsFactory metricsFactory, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - final ConcurrentMap shardInfoShardConsumerMap, - final long leaseAssignmentIntervalMillis) { + final ConcurrentMap shardInfoShardConsumerMap) { this( leaseRefresher, @@ -223,10 +225,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { metricsFactory, workerUtilizationAwareAssignmentConfig, gracefulLeaseHandoffConfig, - shardInfoShardConsumerMap); - - // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. - this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; + shardInfoShardConsumerMap, + 2 * leaseDurationMillis); } @RequiredArgsConstructor diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 30f38f36..cf6df4af 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -110,114 +110,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; private long leaseAssignmentIntervalMillis; - /** - * Constructor. - * @param kinesisClient - * @param dynamoDBClient - * @param tableName - * @param workerIdentifier - * @param executorService - * @param failoverTimeMillis - * @param enablePriorityLeaseAssignment - * @param epsilonMillis - * @param maxLeasesForWorker - * @param maxLeasesToStealAtOneTime - * @param maxLeaseRenewalThreads - * @param cleanupLeasesUponShardCompletion - * @param ignoreUnexpectedChildShards - * @param shardSyncIntervalMillis - * @param consistentReads - * @param listShardsBackoffTimeMillis - * @param maxListShardsRetryAttempts - * @param maxCacheMissesBeforeReload - * @param listShardsCacheAllowedAgeInSeconds - * @param cacheMissWarningModulus - * @param initialLeaseTableReadCapacity - * @param initialLeaseTableWriteCapacity - * @param tableCreatorCallback - * @param dynamoDbRequestTimeout - * @param billingMode - * @param leaseTableDeletionProtectionEnabled - * @param leaseTablePitrEnabled - * @param leaseSerializer - * @param customShardDetectorProvider - * @param isMultiStreamMode - * @param leaseCleanupConfig - * @param workerUtilizationAwareAssignmentConfig - * @param gracefulLeaseHandoffConfig - */ - @Deprecated - public DynamoDBLeaseManagementFactory( - final @NotNull KinesisAsyncClient kinesisClient, - final @NotNull DynamoDbAsyncClient dynamoDBClient, - final @NotNull String tableName, - final @NotNull String workerIdentifier, - final @NotNull ExecutorService executorService, - final long failoverTimeMillis, - final boolean enablePriorityLeaseAssignment, - final long epsilonMillis, - final int maxLeasesForWorker, - final int maxLeasesToStealAtOneTime, - final int maxLeaseRenewalThreads, - final boolean cleanupLeasesUponShardCompletion, - final boolean ignoreUnexpectedChildShards, - final long shardSyncIntervalMillis, - final boolean consistentReads, - final long listShardsBackoffTimeMillis, - final int maxListShardsRetryAttempts, - final int maxCacheMissesBeforeReload, - final long listShardsCacheAllowedAgeInSeconds, - final int cacheMissWarningModulus, - final long initialLeaseTableReadCapacity, - final long initialLeaseTableWriteCapacity, - final TableCreatorCallback tableCreatorCallback, - final Duration dynamoDbRequestTimeout, - final BillingMode billingMode, - final boolean leaseTableDeletionProtectionEnabled, - final boolean leaseTablePitrEnabled, - final Collection tags, - final @NotNull LeaseSerializer leaseSerializer, - final Function customShardDetectorProvider, - boolean isMultiStreamMode, - final LeaseCleanupConfig leaseCleanupConfig, - final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, - final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { - this.kinesisClient = kinesisClient; - this.dynamoDBClient = dynamoDBClient; - this.tableName = tableName; - this.workerIdentifier = workerIdentifier; - this.executorService = executorService; - this.failoverTimeMillis = failoverTimeMillis; - this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; - this.epsilonMillis = epsilonMillis; - this.maxLeasesForWorker = maxLeasesForWorker; - this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; - this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; - this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; - this.shardSyncIntervalMillis = shardSyncIntervalMillis; - this.consistentReads = consistentReads; - this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; - this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; - this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; - this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; - this.cacheMissWarningModulus = cacheMissWarningModulus; - this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; - this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; - this.tableCreatorCallback = tableCreatorCallback; - this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; - this.billingMode = billingMode; - this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; - this.leaseTablePitrEnabled = leaseTablePitrEnabled; - this.leaseSerializer = leaseSerializer; - this.customShardDetectorProvider = customShardDetectorProvider; - this.isMultiStreamMode = isMultiStreamMode; - this.leaseCleanupConfig = leaseCleanupConfig; - this.tags = tags; - this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; - this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; - } - /** * Constructor. * @param kinesisClient @@ -290,7 +182,116 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final LeaseCleanupConfig leaseCleanupConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - long leaseAssignmentIntervalMillis) { + final long leaseAssignmentIntervalMillis) { + this.kinesisClient = kinesisClient; + this.dynamoDBClient = dynamoDBClient; + this.tableName = tableName; + this.workerIdentifier = workerIdentifier; + this.executorService = executorService; + this.failoverTimeMillis = failoverTimeMillis; + this.enablePriorityLeaseAssignment = enablePriorityLeaseAssignment; + this.epsilonMillis = epsilonMillis; + this.maxLeasesForWorker = maxLeasesForWorker; + this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; + this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.consistentReads = consistentReads; + this.listShardsBackoffTimeMillis = listShardsBackoffTimeMillis; + this.maxListShardsRetryAttempts = maxListShardsRetryAttempts; + this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; + this.listShardsCacheAllowedAgeInSeconds = listShardsCacheAllowedAgeInSeconds; + this.cacheMissWarningModulus = cacheMissWarningModulus; + this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; + this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; + this.tableCreatorCallback = tableCreatorCallback; + this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; + this.billingMode = billingMode; + this.leaseTableDeletionProtectionEnabled = leaseTableDeletionProtectionEnabled; + this.leaseTablePitrEnabled = leaseTablePitrEnabled; + this.leaseSerializer = leaseSerializer; + this.customShardDetectorProvider = customShardDetectorProvider; + this.isMultiStreamMode = isMultiStreamMode; + this.leaseCleanupConfig = leaseCleanupConfig; + this.tags = tags; + this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; + this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; + } + + /** + * Constructor. + * @param kinesisClient + * @param dynamoDBClient + * @param tableName + * @param workerIdentifier + * @param executorService + * @param failoverTimeMillis + * @param enablePriorityLeaseAssignment + * @param epsilonMillis + * @param maxLeasesForWorker + * @param maxLeasesToStealAtOneTime + * @param maxLeaseRenewalThreads + * @param cleanupLeasesUponShardCompletion + * @param ignoreUnexpectedChildShards + * @param shardSyncIntervalMillis + * @param consistentReads + * @param listShardsBackoffTimeMillis + * @param maxListShardsRetryAttempts + * @param maxCacheMissesBeforeReload + * @param listShardsCacheAllowedAgeInSeconds + * @param cacheMissWarningModulus + * @param initialLeaseTableReadCapacity + * @param initialLeaseTableWriteCapacity + * @param tableCreatorCallback + * @param dynamoDbRequestTimeout + * @param billingMode + * @param leaseTableDeletionProtectionEnabled + * @param leaseTablePitrEnabled + * @param leaseSerializer + * @param customShardDetectorProvider + * @param isMultiStreamMode + * @param leaseCleanupConfig + * @param workerUtilizationAwareAssignmentConfig + * @param gracefulLeaseHandoffConfig + */ + @Deprecated + public DynamoDBLeaseManagementFactory( + final @NotNull KinesisAsyncClient kinesisClient, + final @NotNull DynamoDbAsyncClient dynamoDBClient, + final @NotNull String tableName, + final @NotNull String workerIdentifier, + final @NotNull ExecutorService executorService, + final long failoverTimeMillis, + final boolean enablePriorityLeaseAssignment, + final long epsilonMillis, + final int maxLeasesForWorker, + final int maxLeasesToStealAtOneTime, + final int maxLeaseRenewalThreads, + final boolean cleanupLeasesUponShardCompletion, + final boolean ignoreUnexpectedChildShards, + final long shardSyncIntervalMillis, + final boolean consistentReads, + final long listShardsBackoffTimeMillis, + final int maxListShardsRetryAttempts, + final int maxCacheMissesBeforeReload, + final long listShardsCacheAllowedAgeInSeconds, + final int cacheMissWarningModulus, + final long initialLeaseTableReadCapacity, + final long initialLeaseTableWriteCapacity, + final TableCreatorCallback tableCreatorCallback, + final Duration dynamoDbRequestTimeout, + final BillingMode billingMode, + final boolean leaseTableDeletionProtectionEnabled, + final boolean leaseTablePitrEnabled, + final Collection tags, + final @NotNull LeaseSerializer leaseSerializer, + final Function customShardDetectorProvider, + boolean isMultiStreamMode, + final LeaseCleanupConfig leaseCleanupConfig, + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { this( kinesisClient, dynamoDBClient, @@ -325,8 +326,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { isMultiStreamMode, leaseCleanupConfig, workerUtilizationAwareAssignmentConfig, - gracefulLeaseHandoffConfig); - this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; + gracefulLeaseHandoffConfig, + 2 * failoverTimeMillis); } @Override