Address comments

This commit is contained in:
Chunxue Yang 2020-03-19 15:47:27 -07:00
parent f57a332671
commit 384fe5266c
2 changed files with 9 additions and 9 deletions

View file

@ -94,7 +94,7 @@ public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L; private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private SchedulerLog slog = new SchedulerLog(); private SchedulerLog slog = new SchedulerLog();
@ -127,7 +127,6 @@ public class Scheduler implements Runnable {
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private final GracefulShutdownCoordinator gracefulShutdownCoordinator; private final GracefulShutdownCoordinator gracefulShutdownCoordinator;
private final WorkerStateChangeListener workerStateChangeListener; private final WorkerStateChangeListener workerStateChangeListener;
private final InitialPositionInStreamExtended initialPosition;
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final long failoverTimeMillis; private final long failoverTimeMillis;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
@ -249,7 +248,6 @@ public class Scheduler implements Runnable {
} }
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher, this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds(); // this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
@ -308,7 +306,8 @@ public class Scheduler implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
// TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing // TODO: Resume the shard sync from failed stream in the next attempt, to avoid syncing
// TODO: for already synced streams // TODO: for already synced streams
if (!waitAndCheckIfLeaseTableIsReady()) { if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) { for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final StreamIdentifier streamIdentifier = streamConfigEntry.getKey(); final StreamIdentifier streamIdentifier = streamConfigEntry.getKey();
createOrGetShardSyncTaskManager(streamIdentifier); createOrGetShardSyncTaskManager(streamIdentifier);
@ -359,18 +358,19 @@ public class Scheduler implements Runnable {
} }
@VisibleForTesting @VisibleForTesting
boolean waitAndCheckIfLeaseTableIsReady() throws InterruptedException, boolean shouldInitiateLeaseSync() throws InterruptedException,
DependencyException, ProvisionedThroughputException, InvalidStateException { DependencyException, ProvisionedThroughputException, InvalidStateException {
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
long waitUntil = System.currentTimeMillis() + waitTime; long waitUntil = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) { boolean isLeaseTableEmpty = true;
while (System.currentTimeMillis() < waitUntil && (isLeaseTableEmpty = leaseRefresher.isLeaseTableEmpty())) {
// check every 3 seconds if lease table is still empty, // check every 3 seconds if lease table is still empty,
// to minimize contention between all workers bootstrapping at the same time // to minimize contention between all workers bootstrapping at the same time
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS); log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS); Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
} }
return !leaseRefresher.isLeaseTableEmpty(); return isLeaseTableEmpty;
} }
private void waitUntilHashRangeCovered() throws InterruptedException { private void waitUntilHashRangeCovered() throws InterruptedException {

View file

@ -392,7 +392,7 @@ public class SchedulerTest {
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
scheduler.waitAndCheckIfLeaseTableIsReady(); scheduler.shouldInitiateLeaseSync();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
@ -411,7 +411,7 @@ public class SchedulerTest {
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
scheduler.waitAndCheckIfLeaseTableIsReady(); scheduler.shouldInitiateLeaseSync();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS); assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);