diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index dc730441..85234f7f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -94,7 +94,7 @@ public class Scheduler implements Runnable { private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; - private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; + private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private SchedulerLog slog = new SchedulerLog(); @@ -127,7 +127,6 @@ public class Scheduler implements Runnable { private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final GracefulShutdownCoordinator gracefulShutdownCoordinator; private final WorkerStateChangeListener workerStateChangeListener; - private final InitialPositionInStreamExtended initialPosition; private final MetricsFactory metricsFactory; private final long failoverTimeMillis; private final long taskBackoffTimeMillis; @@ -249,7 +248,6 @@ public class Scheduler implements Runnable { } this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); - this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); @@ -308,7 +306,8 @@ public class Scheduler implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: for already synced streams - if (!waitAndCheckIfLeaseTableIsReady()) { + if (shouldInitiateLeaseSync()) { + log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier()); for (Map.Entry streamConfigEntry : currentStreamConfigMap.entrySet()) { final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); createOrGetShardSyncTaskManager(streamIdentifier); @@ -359,18 +358,19 @@ public class Scheduler implements Runnable { } @VisibleForTesting - boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException, + boolean shouldInitiateLeaseSync() throws InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitUntil = System.currentTimeMillis() + waitTime; - while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) { + boolean isLeaseTableEmpty = true; + while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) { // check every 3 seconds if lease table is still empty, // to minimize contention between all workers bootstrapping at the same time log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); } - return !leaseRefresher.isLeaseTableEmpty(); + return isLeaseTableEmpty; } private void waitUntilHashRangeCovered() throws InterruptedException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index b14f3303..812b05df 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -392,7 +392,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); long startTime = System.currentTimeMillis(); - scheduler.waitAndCheckIfLeaseTableIsReady(); + scheduler.shouldInitiateLeaseSync(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); @@ -411,7 +411,7 @@ public class SchedulerTest { when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); long startTime = System.currentTimeMillis(); - scheduler.waitAndCheckIfLeaseTableIsReady(); + scheduler.shouldInitiateLeaseSync(); long endTime = System.currentTimeMillis(); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);