Quick fix for shutdown race issue (#439)

* Added a synchronized lock in the initialize and shutdown methods
This commit is contained in:
Sahil Palvia 2018-10-09 13:30:35 -07:00 committed by Justin Pfifer
parent 0326e217f6
commit 854e316b83

View file

@ -122,6 +122,8 @@ public class Scheduler implements Runnable {
private volatile long shutdownStartTimeMillis; private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false; private volatile boolean shutdownComplete = false;
private final Object lock = new Object();
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
*/ */
@ -222,55 +224,57 @@ public class Scheduler implements Runnable {
} }
private void initialize() { private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); synchronized (lock) {
boolean isDone = false; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
Exception lastException = null; boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) { for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
try { try {
log.info("Initialization attempt {}", (i + 1)); log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator"); log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize(); leaseCoordinator.initialize();
TaskResult result = null; TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info"); log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
}
if (result == null || result.getException() == null) {
if (!leaseCoordinator.isRunning()) {
log.info("Starting LeaseCoordinator");
leaseCoordinator.start();
} else { } else {
log.info("LeaseCoordinator is already running. No need to start it."); log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
} }
isDone = true;
} else { if (result == null || result.getException() == null) {
lastException = result.getException(); if (!leaseCoordinator.isRunning()) {
log.info("Starting LeaseCoordinator");
leaseCoordinator.start();
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
isDone = true;
} else {
lastException = result.getException();
}
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
lastException = e;
} catch (Exception e) {
lastException = e;
}
try {
Thread.sleep(parentShardPollIntervalMillis);
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
} }
} catch (LeasingException e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
lastException = e;
} catch (Exception e) {
lastException = e;
} }
try { if (!isDone) {
Thread.sleep(parentShardPollIntervalMillis); throw new RuntimeException(lastException);
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
} }
if (!isDone) {
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
} }
@VisibleForTesting @VisibleForTesting
@ -463,21 +467,23 @@ public class Scheduler implements Runnable {
* </ol> * </ol>
*/ */
public void shutdown() { public void shutdown() {
if (shutdown) { synchronized (lock) {
log.warn("Shutdown requested a second time."); if (shutdown) {
return; log.warn("Shutdown requested a second time.");
return;
}
log.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process.
shutdown = true;
shutdownStartTimeMillis = System.currentTimeMillis();
// Stop lease coordinator, so leases are not renewed or stolen from other workers.
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
} }
log.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process.
shutdown = true;
shutdownStartTimeMillis = System.currentTimeMillis();
// Stop lease coordinator, so leases are not renewed or stolen from other workers.
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
} }
/** /**