diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java index 7b10694f..7d166b98 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java @@ -77,6 +77,11 @@ class PeriodicShardSyncManager { return new TaskResult(null); } + /** + * Runs shardSync once + * Does not schedule periodic shardSync + * @return the result of the task + */ public synchronized TaskResult syncShardsOnce() { Exception lastException = null; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 87b92162..6f4d4ed1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -37,7 +37,10 @@ import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTask; import software.amazon.kinesis.leases.ShardSyncTaskManager; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumerArgument; @@ -79,6 +82,10 @@ import java.util.concurrent.TimeUnit; public class Scheduler implements Runnable { private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; + private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; + private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private SchedulerLog slog = new SchedulerLog(); private final CheckpointConfig checkpointConfig; @@ -227,11 +234,7 @@ public class Scheduler implements Runnable { this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); - this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), - leaderDecider, shardSyncTask, metricsFactory); + this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager(); } /** @@ -275,12 +278,7 @@ public class Scheduler implements Runnable { TaskResult result = null; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { - for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) { - // check every 1-5 seconds if lease table is still empty, - // to minimize contention between all workers bootstrapping at the same time - long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L); - Thread.sleep(waitTime); - } + waitUntilLeaseTableIsReady(); log.info("Syncing Kinesis shard info"); result = leaderElectedPeriodicShardSyncManager.syncShardsOnce(); } else { @@ -297,6 +295,7 @@ public class Scheduler implements Runnable { log.info("Scheduling periodicShardSync)"); // leaderElectedPeriodicShardSyncManager.start(); // TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged + waitUntilHashRangeCovered(); isDone = true; } else { lastException = result.getException(); @@ -308,7 +307,7 @@ public class Scheduler implements Runnable { lastException = e; } - if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { + if (!isDone) { try { Thread.sleep(schedulerInitializationBackoffTimeMillis); leaderElectedPeriodicShardSyncManager.stop(); @@ -325,6 +324,29 @@ public class Scheduler implements Runnable { } } + @VisibleForTesting + void waitUntilLeaseTableIsReady() throws InterruptedException, + DependencyException, ProvisionedThroughputException, InvalidStateException { + long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + long waitUntil = System.currentTimeMillis() + waitTime; + + while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) { + // check every 3 seconds if lease table is still empty, + // to minimize contention between all workers bootstrapping at the same time + log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); + } + } + + private void waitUntilHashRangeCovered() throws InterruptedException { + + while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) { + // wait until entire hash range is covered + log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); + Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS); + } + } + @VisibleForTesting void runProcessLoop() { try { @@ -633,6 +655,14 @@ public class Scheduler implements Runnable { argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } + private PeriodicShardSyncManager buildPeriodicShardSyncManager() { + final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, + metricsFactory); + return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(), + leaderDecider, shardSyncTask, metricsFactory); + } + /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. *

diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 8be1bb8f..3bf68829 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -89,6 +89,8 @@ public class SchedulerTest { private final String applicationName = "applicationName"; private final String streamName = "streamName"; private final String namespace = "testNamespace"; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private Scheduler scheduler; private ShardRecordProcessorFactory shardRecordProcessorFactory; @@ -265,6 +267,43 @@ public class SchedulerTest { verify(shardDetector, times(maxInitializationAttempts)).listShards(); } + @Test + public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception { + final int maxInitializationAttempts = 1; + coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + doNothing().when(leaseCoordinator).initialize(); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); + + long startTime = System.currentTimeMillis(); + scheduler.waitUntilLeaseTableIsReady(); + long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + assertTrue(endTime - startTime < MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + } + + @Test + public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception { + final int maxInitializationAttempts = 1; + coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts); + coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + doNothing().when(leaseCoordinator).initialize(); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + + long startTime = System.currentTimeMillis(); + scheduler.waitUntilLeaseTableIsReady(); + long endTime = System.currentTimeMillis(); + + assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); + } + @Test public final void testSchedulerShutdown() { scheduler.shutdown();