From 6ca825e5c5b396e2af3b9df68cff444a7577f8ff Mon Sep 17 00:00:00 2001 From: Thuan Duong-Ba Date: Sun, 19 Jan 2020 17:30:39 -0800 Subject: [PATCH] shutdown worker on initialization failure. --- .../clientlibrary/lib/worker/Worker.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 276b5570..258ca46e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -641,13 +641,8 @@ public class Worker implements Runnable { return; } - try { - initialize(); - LOG.info("Initialization complete. Starting worker loop."); - } catch (RuntimeException e1) { - LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1); - shutdown(); - } + initialize(); + LOG.info("Starting worker loop."); while (!shouldShutdown()) { runProcessLoop(); @@ -694,7 +689,7 @@ public class Worker implements Runnable { } private void initialize() { - if (this.isInitialized) { + if (isInitialized) { return; } @@ -702,7 +697,7 @@ public class Worker implements Runnable { Exception lastException = null; - for (int i = 0; (!this.isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { + for (int i = 0; (!isInitialized) && (i < MAX_INITIALIZATION_ATTEMPTS); i++) { try { LOG.info("Initialization attempt " + (i + 1)); LOG.info("Initializing LeaseCoordinator"); @@ -728,7 +723,7 @@ public class Worker implements Runnable { LOG.info("LeaseCoordinator is already running. No need to start it."); } shardSyncStrategy.onWorkerInitialization(); - this.isInitialized = true; + isInitialized = true; } else { lastException = result.getException(); } @@ -746,10 +741,14 @@ public class Worker implements Runnable { } } - if (!this.isInitialized) { + if (!isInitialized) { + shutdown(); + LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", + lastException); throw new RuntimeException(lastException); } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); + LOG.info("Initialization complete."); } /**