diff --git a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java index 8a5c7c34..070757ec 100644 --- a/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java +++ b/amazon-kinesis-client-multilang/src/test/java/software/amazon/kinesis/multilang/config/PropertiesMappingE2ETest.java @@ -249,7 +249,7 @@ public class PropertiesMappingE2ETest { .workerUtilizationAwareAssignmentConfig() .staleWorkerMetricsEntryCleanupDuration()); assertEquals( - 3, + 6, kclV3Config .leaseManagementConfig .workerUtilizationAwareAssignmentConfig() diff --git a/amazon-kinesis-client/src/main/java/SampleApp.java b/amazon-kinesis-client/src/main/java/SampleApp.java new file mode 100644 index 00000000..80e7c5df --- /dev/null +++ b/amazon-kinesis-client/src/main/java/SampleApp.java @@ -0,0 +1,77 @@ +import java.util.UUID; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.ConfigsBuilder; +import software.amazon.kinesis.common.KinesisClientUtil; +import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.leases.LeaseManagementConfig; +import software.amazon.kinesis.worker.RecordProcessorFactory; + +public class SampleApp { + + private final String streamName; + private final Region region; + private final KinesisAsyncClient kinesisClient; + + public static void main(String[] args) { + String streamName = "DDB-Scan-usage-test"; + Region region = Region.US_EAST_1; + new SampleApp(streamName, region).run(); + } + + public SampleApp(String streamName, Region region) { + this.streamName = streamName; + this.region = region; + this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( + KinesisAsyncClient.builder().region(this.region)); + } + + public void run() { + DynamoDbAsyncClient dynamoDbAsyncClient = + DynamoDbAsyncClient.builder().region(region).build(); + CloudWatchAsyncClient cloudWatchClient = + CloudWatchAsyncClient.builder().region(region).build(); + + ConfigsBuilder configsBuilder = new ConfigsBuilder( + streamName, + streamName, + kinesisClient, + dynamoDbAsyncClient, + cloudWatchClient, + UUID.randomUUID().toString(), + new RecordProcessorFactory()); + LeaseManagementConfig leaseManagementConfig = configsBuilder.leaseManagementConfig(); + leaseManagementConfig.workerUtilizationAwareAssignmentConfig().varianceBalancingFrequency(5); + + // failoverTimeMillis = 10 min + leaseManagementConfig.failoverTimeMillis(600000); // 10 minute + // RetrievalConfig config = configsBuilder.retrievalConfig(); + // PollingConfig pollingConfig = new PollingConfig(config.kinesisClient()); + // + // //idleTimeBetweenReadsInMillis = 200 + // pollingConfig.idleTimeBetweenReadsInMillis(200); + // + // config.retrievalSpecificConfig(); + // + // //reBalanceThresholdPercentage = 4 + // leaseManagementConfig.workerUtilizationAwareAssignmentConfig().reBalanceThresholdPercentage(4); + // + // // maxGetRecordsThreadPool = 10 ?? + + Scheduler scheduler = new Scheduler( + configsBuilder.checkpointConfig(), + configsBuilder.coordinatorConfig(), + leaseManagementConfig, + configsBuilder.lifecycleConfig(), + configsBuilder.metricsConfig(), + configsBuilder.processorConfig(), + configsBuilder.retrievalConfig()); + + Thread schedulerThread = new Thread(scheduler); + schedulerThread.setDaemon(true); + schedulerThread.start(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e3feaa97..539d114f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -421,7 +421,8 @@ public class Scheduler implements Runnable { lamThreadPool, System::nanoTime, leaseManagementConfig.maxLeasesForWorker(), - leaseManagementConfig.gracefulLeaseHandoffConfig())) + leaseManagementConfig.gracefulLeaseHandoffConfig(), + leaseManagementConfig.leaseAssignmentIntervalMillis())) .adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory)) .deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider( leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory)) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java index f5ea5470..cbd692bf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java @@ -117,11 +117,13 @@ public final class LeaseAssignmentManager { private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; private boolean tookOverLeadershipInThisRun = false; private final Map prevRunLeasesState = new HashMap<>(); + private final long leaseAssignmentIntervalMillis; private Future managerFuture; private int noOfContinuousFailedAttempts = 0; private int lamRunCounter = 0; + private long varianceBasedBalancingLastRunTime; public synchronized void start() { if (isNull(managerFuture)) { @@ -129,10 +131,7 @@ public final class LeaseAssignmentManager { // so reset the flag to refresh the state before processing during a restart of LAM. tookOverLeadershipInThisRun = false; managerFuture = executorService.scheduleWithFixedDelay( - this::performAssignment, - 0L, - leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER, - TimeUnit.MILLISECONDS); + this::performAssignment, 0L, (int) (leaseAssignmentIntervalMillis), TimeUnit.MILLISECONDS); log.info("Started LeaseAssignmentManager"); return; } @@ -240,6 +239,7 @@ public final class LeaseAssignmentManager { if (shouldRunVarianceBalancing()) { final long balanceWorkerVarianceStartTime = System.currentTimeMillis(); + this.varianceBasedBalancingLastRunTime = balanceWorkerVarianceStartTime; final int totalNewAssignmentBeforeWorkerVarianceBalancing = inMemoryStorageView.leaseToNewAssignedWorkerMap.size(); leaseAssignmentDecider.balanceWorkerVariance(); @@ -280,14 +280,15 @@ public final class LeaseAssignmentManager { } private boolean shouldRunVarianceBalancing() { - final boolean response = this.lamRunCounter == 0; - /* - To avoid lamRunCounter grow large, keep it within [0,varianceBalancingFrequency). - If varianceBalancingFrequency is 5 lamRunCounter value will be within 0 to 4 and method return true when - lamRunCounter is 0. - */ - this.lamRunCounter = (this.lamRunCounter + 1) % config.varianceBalancingFrequency(); - return response; + + final long now = System.currentTimeMillis(); + final long varianceBalancingInterval = leaseDurationMillis * config.varianceBalancingFrequency(); + + if (now - this.varianceBasedBalancingLastRunTime >= varianceBalancingInterval) { + this.varianceBasedBalancingLastRunTime = now; + return true; + } + return false; } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 1839b494..f8782eea 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -114,6 +114,16 @@ public class LeaseManagementConfig { */ private long failoverTimeMillis = 10000L; + /** + * Lease assignment interval in milliseconds - e.g. wait for this long between Lease assignment run. + * + *

Default value: 2 * {@link LeaseManagementConfig#failoverTimeMillis}

+ */ + private Long leaseAssignmentIntervalMillis; + + public long leaseAssignmentIntervalMillis() { + return leaseAssignmentIntervalMillis != null ? leaseAssignmentIntervalMillis : 2 * failoverTimeMillis; + } /** * Whether workers should take very expired leases at priority. A very expired lease is when a worker does not * renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at @@ -489,7 +499,8 @@ public class LeaseManagementConfig { isMultiStreamingMode, leaseCleanupConfig(), workerUtilizationAwareAssignmentConfig(), - gracefulLeaseHandoffConfig); + gracefulLeaseHandoffConfig, + leaseAssignmentIntervalMillis()); } return leaseManagementFactory; } @@ -568,13 +579,13 @@ public class LeaseManagementConfig { private WorkerMetricsTableConfig workerMetricsTableConfig; /** - * Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency, - * that is every third (as default) iteration of LAM the worker variance balancing will be performed. - * Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration + * Frequency to perform worker variance balancing. This value is used with respect to the failoverTimeMillis, + * that is every six (as default) * failoverTimeMillis the worker variance balancing will be performed. + * Setting it to 1 will make varianceBalancing run on every failoverTimeMillis and 2 on every 2 * failoverTimeMillis * and so on. - * NOTE: LAM frequency = failoverTimeMillis + * NOTE: LAM frequency = {@link LeaseManagementConfig#leaseAssignmentIntervalMillis} */ - private int varianceBalancingFrequency = 3; + private int varianceBalancingFrequency = 6; /** * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 4f4d7886..7090f81e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -143,7 +143,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final MetricsFactory metricsFactory, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, - final ConcurrentMap shardInfoShardConsumerMap) { + final ConcurrentMap shardInfoShardConsumerMap, + final long leaseAssignmentIntervalMillis) { this.leaseRefresher = leaseRefresher; this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY); this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory) @@ -152,8 +153,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { .withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; - // Should run once every leaseDurationMillis to identify new leases before expiry. - this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis; + // Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry. + this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis - epsilonMillis) / 2; this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis); this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create( gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 7d902afd..45a766e7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -108,6 +108,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final boolean isMultiStreamMode; private final LeaseCleanupConfig leaseCleanupConfig; private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig; + private final long leaseAssignmentIntervalMillis; /** * Constructor. @@ -144,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param leaseCleanupConfig * @param workerUtilizationAwareAssignmentConfig * @param gracefulLeaseHandoffConfig + * @param leaseAssignmentIntervalMillis */ public DynamoDBLeaseManagementFactory( final @NotNull KinesisAsyncClient kinesisClient, @@ -179,7 +181,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { boolean isMultiStreamMode, final LeaseCleanupConfig leaseCleanupConfig, final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, - final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) { + final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig, + long leaseAssignmentIntervalMillis) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -214,6 +217,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.tags = tags; this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig; this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig; + this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis; } @Override @@ -239,7 +243,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { metricsFactory, workerUtilizationAwareAssignmentConfig, gracefulLeaseHandoffConfig, - shardInfoShardConsumerMap); + shardInfoShardConsumerMap, + leaseAssignmentIntervalMillis); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java new file mode 100644 index 00000000..f9e341f0 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessor.java @@ -0,0 +1,84 @@ +package software.amazon.kinesis.worker; + +import org.slf4j.MDC; +import software.amazon.kinesis.exceptions.InvalidStateException; +import software.amazon.kinesis.exceptions.ShutdownException; +import software.amazon.kinesis.lifecycle.events.InitializationInput; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; +import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; +import software.amazon.kinesis.processor.ShardRecordProcessor; + +public class RecordProcessor implements ShardRecordProcessor { + + private static final String SHARD_ID_MDC_KEY = "ShardId"; + private String shardId; + + @Override + public void initialize(InitializationInput initializationInput) { + shardId = initializationInput.shardId(); + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + // log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + // log.info("Processing {} record(s)", processRecordsInput.records().size()); + // processRecordsInput + // .records() + // .forEach( + // r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), + // r.sequenceNumber())); + + // Checkpoint periodically + processRecordsInput.checkpointer().checkpoint(); + } catch (Throwable t) { + // log.error("Caught throwable while processing records. Aborting.", t); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + @Override + public void leaseLost(LeaseLostInput leaseLostInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + // log.info("Lost lease, so terminating."); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + @Override + public void shardEnded(ShardEndedInput shardEndedInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + // log.info("Reached shard end checkpointing."); + shardEndedInput.checkpointer().checkpoint(); + } catch (ShutdownException | InvalidStateException e) { + // log.error("Exception while checkpointing at shard end. Giving up.", e); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } + + @Override + public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { + MDC.put(SHARD_ID_MDC_KEY, shardId); + try { + // log.info("Scheduler is shutting down, checkpointing."); + shutdownRequestedInput.checkpointer().checkpoint(); + } catch (ShutdownException | InvalidStateException e) { + // log.error("Exception while checkpointing at requested shutdown. Giving up.", e); + } finally { + MDC.remove(SHARD_ID_MDC_KEY); + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java new file mode 100644 index 00000000..10f46764 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/RecordProcessorFactory.java @@ -0,0 +1,10 @@ +package software.amazon.kinesis.worker; + +import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.ShardRecordProcessorFactory; + +public class RecordProcessorFactory implements ShardRecordProcessorFactory { + public ShardRecordProcessor shardRecordProcessor() { + return new RecordProcessor(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java index 6bc3a581..5e27ffcc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManagerTest.java @@ -17,6 +17,8 @@ import lombok.var; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mockito; import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList; import software.amazon.awssdk.enhanced.dynamodb.TableSchema; @@ -49,6 +51,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -430,7 +433,8 @@ class LeaseAssignmentManagerTest { .anyMatch(lease -> lease.leaseOwner().equals(TEST_YIELD_WORKER_ID + "2"))); } - @Test + // no needed since variance based load balancing is not longed tied to LAM run + // @Test void performAssignment_varianceBalanceFreq3_asserLoadBalancingEvery3Iteration() throws Exception { final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); @@ -483,6 +487,56 @@ class LeaseAssignmentManagerTest { .count()); } + @Test + void performAssignment_varianceBalanceFreq3_asserLoadBalancingEveryVarianceBalancingFrequencyLeaseDuration() + throws Exception { + final int varianceBalancingFrequency = 3; + final long leaseDuration = Duration.ofMillis(1000).toMillis(); + final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config = + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 10); + config.varianceBalancingFrequency(varianceBalancingFrequency); + + createLeaseAssignmentManager(config, leaseDuration, System::nanoTime, Integer.MAX_VALUE); + + long balancingInterval = leaseDuration * varianceBalancingFrequency; + int varianceBalancingOccurred = 0; + + // Initial run at time 0 + setupConditionForVarianceBalancing(); + long startTime = System.currentTimeMillis(); + leaseAssignmentManagerRunnable.run(); + + // Check initial balancing at time 0 + long leasesOwnedByWorker = leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count(); + if (leasesOwnedByWorker == 3L) { + varianceBalancingOccurred++; + } + + // Run until we see the next balancing since LAM run is not tied to variance-based load balancing + long nextBalancingTime = startTime + balancingInterval; + + while (System.currentTimeMillis() < (startTime + balancingInterval + 1000)) { + setupConditionForVarianceBalancing(); + leaseAssignmentManagerRunnable.run(); + + leasesOwnedByWorker = leaseRefresher.listLeases().stream() + .filter(lease -> lease.leaseOwner().equals(TEST_TAKE_WORKER_ID)) + .count(); + + if (leasesOwnedByWorker == 3L && System.currentTimeMillis() >= nextBalancingTime) { + varianceBalancingOccurred++; + } + + Thread.sleep(100); + } + + assertTrue( + varianceBalancingOccurred == 2, + "Expected varianceBalancingOccurred to be greater than 1, but was: " + varianceBalancingOccurred); + } + private void setupConditionForVarianceBalancing() throws Exception { workerMetricsDAO.updateMetrics(createDummyYieldWorkerMetrics(TEST_YIELD_WORKER_ID + "1")); @@ -746,7 +800,8 @@ class LeaseAssignmentManagerTest { Integer.MAX_VALUE, LeaseManagementConfig.GracefulLeaseHandoffConfig.builder() .isGracefulLeaseHandoffEnabled(false) - .build()); + .build(), + 100L); leaseAssignmentManager.start(); @@ -1134,6 +1189,62 @@ class LeaseAssignmentManagerTest { dynamoDbAsyncClient.putItem(putItemRequest); } + @Test + void testLeaseAssignmentSchedulingWithDefaultInterval() { + long failoverTimeMillis = 1000L; + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + + LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( + leaseRefresher, + workerMetricsDAO, + mockLeaderDecider, + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), + TEST_LEADER_WORKER_ID, + failoverTimeMillis, + new NullMetricsFactory(), + mockExecutor, + System::nanoTime, + Integer.MAX_VALUE, + gracefulLeaseHandoffConfig, + 2 * failoverTimeMillis); + + leaseAssignmentManager.start(); + + verify(mockExecutor) + .scheduleWithFixedDelay( + any(Runnable.class), eq(0L), eq(2 * failoverTimeMillis), eq(TimeUnit.MILLISECONDS)); + } + + @ParameterizedTest + @CsvSource({ + "1000, 500", // leaseAssignmentInterval smaller than failover + "1000, 1000", // leaseAssignmentInterval equal to failover + "1000, 2000", // leaseAssignmentInterval larger than failover + }) + void testLeaseAssignmentWithDifferentIntervals(long failoverTimeMillis, long leaseAssignmentIntervalMillis) { + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + + LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager( + leaseRefresher, + workerMetricsDAO, + mockLeaderDecider, + getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20), + TEST_LEADER_WORKER_ID, + failoverTimeMillis, + new NullMetricsFactory(), + mockExecutor, + System::nanoTime, + Integer.MAX_VALUE, + gracefulLeaseHandoffConfig, + leaseAssignmentIntervalMillis); + + leaseAssignmentManager.start(); + + verify(mockExecutor) + .scheduleWithFixedDelay( + any(Runnable.class), eq(0L), eq(leaseAssignmentIntervalMillis), eq(TimeUnit.MILLISECONDS)); + } + private LeaseAssignmentManager createLeaseAssignmentManager( final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config, final Long leaseDurationMillis, @@ -1151,7 +1262,8 @@ class LeaseAssignmentManagerTest { scheduledExecutorService, nanoTimeProvider, maxLeasesPerWorker, - gracefulLeaseHandoffConfig); + gracefulLeaseHandoffConfig, + 2 * leaseDurationMillis); leaseAssignmentManager.start(); return leaseAssignmentManager; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index 5694b03a..400310a7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -71,6 +71,7 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { private static DynamoDBLeaseRefresher leaseRefresher; private static DynamoDBCheckpointer dynamoDBCheckpointer; + private static final long LEASE_ASSIGNMENT_INTERVAL_MILLIS = 2 * LEASE_DURATION_MILLIS; private LeaseCoordinator coordinator; private final String leaseKey = "shd-1"; @@ -128,7 +129,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { metricsFactory, new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(), LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(), - new ConcurrentHashMap<>()); + new ConcurrentHashMap<>(), + LEASE_ASSIGNMENT_INTERVAL_MILLIS); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer.operation(OPERATION);