shutdown worker on initialization failure.

This commit is contained in:
Thuan Duong-Ba 2020-01-19 17:30:39 -08:00
parent cd9d7e319e
commit 6ca825e5c5

View file

@ -641,13 +641,8 @@ public class Worker implements Runnable {
return; return;
} }
try {
initialize(); initialize();
LOG.info("Initialization complete. Starting worker loop."); LOG.info("Starting worker loop.");
} catch (RuntimeException e1) {
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1);
shutdown();
}
while (!shouldShutdown()) { while (!shouldShutdown()) {
runProcessLoop(); runProcessLoop();
@ -694,7 +689,7 @@ public class Worker implements Runnable {
} }
private void initialize() { private void initialize() {
if (this.isInitialized) { if (isInitialized) {
return; return;
} }
@ -702,7 +697,7 @@ public class Worker implements Runnable {
Exception lastException = null; Exception lastException = null;
for (int i = 0; (!this.isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { for (int i = 0; (!isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) {
try { try {
LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initialization attempt " + (i + 1));
LOG.info("Initializing LeaseCoordinator"); LOG.info("Initializing LeaseCoordinator");
@ -728,7 +723,7 @@ public class Worker implements Runnable {
LOG.info("LeaseCoordinator is already running. No need to start it."); LOG.info("LeaseCoordinator is already running. No need to start it.");
} }
shardSyncStrategy.onWorkerInitialization(); shardSyncStrategy.onWorkerInitialization();
this.isInitialized = true; isInitialized = true;
} else { } else {
lastException = result.getException(); lastException = result.getException();
} }
@ -746,10 +741,14 @@ public class Worker implements Runnable {
} }
} }
if (!this.isInitialized) { if (!isInitialized) {
shutdown();
LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.",
lastException);
throw new RuntimeException(lastException); throw new RuntimeException(lastException);
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
LOG.info("Initialization complete.");
} }
/** /**