Add wait to initialization until hashRange covered, modify wait logic for empty leaseTable check

This commit is contained in:
Jegosh John 2020-03-06 10:53:35 -08:00
parent 8511475868
commit 3bd9b29a13
3 changed files with 86 additions and 12 deletions

View file

@ -77,6 +77,11 @@ class PeriodicShardSyncManager {
return new TaskResult(null);
}
/**
* Runs shardSync once
* Does not schedule periodic shardSync
* @return the result of the task
*/
public synchronized TaskResult syncShardsOnce() {
Exception lastException = null;

View file

@ -37,7 +37,10 @@ import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
@ -79,6 +82,10 @@ import java.util.concurrent.TimeUnit;
public class Scheduler implements Runnable {
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;
private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS = 5000L;
private SchedulerLog slog = new SchedulerLog();
private final CheckpointConfig checkpointConfig;
@ -227,11 +234,7 @@ public class Scheduler implements Runnable {
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, shardSyncTask, metricsFactory);
this.leaderElectedPeriodicShardSyncManager = buildPeriodicShardSyncManager();
}
/**
@ -275,12 +278,7 @@ public class Scheduler implements Runnable {
TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
for (int j = 0; j < 10 && leaseRefresher.isLeaseTableEmpty(); j++) {
// check every 1-5 seconds if lease table is still empty,
// to minimize contention between all workers bootstrapping at the same time
long waitTime = ThreadLocalRandom.current().nextLong(1000L, 5000L);
Thread.sleep(waitTime);
}
waitUntilLeaseTableIsReady();
log.info("Syncing Kinesis shard info");
result = leaderElectedPeriodicShardSyncManager.syncShardsOnce();
} else {
@ -297,6 +295,7 @@ public class Scheduler implements Runnable {
log.info("Scheduling periodicShardSync)");
// leaderElectedPeriodicShardSyncManager.start();
// TODO: enable periodicShardSync after https://github.com/jushkem/amazon-kinesis-client/pull/2 is merged
waitUntilHashRangeCovered();
isDone = true;
} else {
lastException = result.getException();
@ -308,7 +307,7 @@ public class Scheduler implements Runnable {
lastException = e;
}
if (!isDone || !leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
if (!isDone) {
try {
Thread.sleep(schedulerInitializationBackoffTimeMillis);
leaderElectedPeriodicShardSyncManager.stop();
@ -325,6 +324,29 @@ public class Scheduler implements Runnable {
}
}
@VisibleForTesting
void waitUntilLeaseTableIsReady() throws InterruptedException,
DependencyException, ProvisionedThroughputException, InvalidStateException {
long waitTime = ThreadLocalRandom.current().nextLong(MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS, MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
long waitUntil = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < waitUntil && leaseRefresher.isLeaseTableEmpty()) {
// check every 3 seconds if lease table is still empty,
// to minimize contention between all workers bootstrapping at the same time
log.info("Lease table is still empty. Checking again in {} ms", LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(LEASE_TABLE_CHECK_FREQUENCY_MILLIS);
}
}
private void waitUntilHashRangeCovered() throws InterruptedException {
while (!leaderElectedPeriodicShardSyncManager.hashRangeCovered()) {
// wait until entire hash range is covered
log.info("Hash range is not covered yet. Checking again in {} ms", HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
Thread.sleep(HASH_RANGE_COVERAGE_CHECK_FREQUENCY_MILLIS);
}
}
@VisibleForTesting
void runProcessLoop() {
try {
@ -633,6 +655,14 @@ public class Scheduler implements Runnable {
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}
private PeriodicShardSyncManager buildPeriodicShardSyncManager() {
final ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
return new PeriodicShardSyncManager(leaseManagementConfig.workerIdentifier(),
leaderDecider, shardSyncTask, metricsFactory);
}
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* <p>

View file

@ -89,6 +89,8 @@ public class SchedulerTest {
private final String applicationName = "applicationName";
private final String streamName = "streamName";
private final String namespace = "testNamespace";
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 5 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private Scheduler scheduler;
private ShardRecordProcessorFactory shardRecordProcessorFactory;
@ -265,6 +267,43 @@ public class SchedulerTest {
verify(shardDetector, times(maxInitializationAttempts)).listShards();
}
@Test
public final void testInitializationWaitsWhenLeaseTableIsEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(true);
long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime > MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
assertTrue(endTime - startTime < MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
}
@Test
public final void testInitializationDoesntWaitWhenLeaseTableIsNotEmpty() throws Exception {
final int maxInitializationAttempts = 1;
coordinatorConfig.maxInitializationAttempts(maxInitializationAttempts);
coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(false);
scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig);
doNothing().when(leaseCoordinator).initialize();
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis();
scheduler.waitUntilLeaseTableIsReady();
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS);
}
@Test
public final void testSchedulerShutdown() {
scheduler.shutdown();