diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 9fa1d721..99de1ebd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -122,6 +122,8 @@ public class Scheduler implements Runnable { private volatile long shutdownStartTimeMillis; private volatile boolean shutdownComplete = false; + private final Object lock = new Object(); + /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -222,55 +224,57 @@ public class Scheduler implements Runnable { } private void initialize() { - workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); - boolean isDone = false; - Exception lastException = null; + synchronized (lock) { + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); + boolean isDone = false; + Exception lastException = null; - for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) { - try { - log.info("Initialization attempt {}", (i + 1)); - log.info("Initializing LeaseCoordinator"); - leaseCoordinator.initialize(); + for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) { + try { + log.info("Initialization attempt {}", (i + 1)); + log.info("Initializing LeaseCoordinator"); + leaseCoordinator.initialize(); - TaskResult result = null; - if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { - log.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); - result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); - } else { - log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); - } - - if (result == null || result.getException() == null) { - if (!leaseCoordinator.isRunning()) { - log.info("Starting LeaseCoordinator"); - leaseCoordinator.start(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { + log.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { - log.info("LeaseCoordinator is already running. No need to start it."); + log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } - isDone = true; - } else { - lastException = result.getException(); + + if (result == null || result.getException() == null) { + if (!leaseCoordinator.isRunning()) { + log.info("Starting LeaseCoordinator"); + leaseCoordinator.start(); + } else { + log.info("LeaseCoordinator is already running. No need to start it."); + } + isDone = true; + } else { + lastException = result.getException(); + } + } catch (LeasingException e) { + log.error("Caught exception when initializing LeaseCoordinator", e); + lastException = e; + } catch (Exception e) { + lastException = e; + } + + try { + Thread.sleep(parentShardPollIntervalMillis); + } catch (InterruptedException e) { + log.debug("Sleep interrupted while initializing worker."); } - } catch (LeasingException e) { - log.error("Caught exception when initializing LeaseCoordinator", e); - lastException = e; - } catch (Exception e) { - lastException = e; } - try { - Thread.sleep(parentShardPollIntervalMillis); - } catch (InterruptedException e) { - log.debug("Sleep interrupted while initializing worker."); + if (!isDone) { + throw new RuntimeException(lastException); } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } - - if (!isDone) { - throw new RuntimeException(lastException); - } - workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } @VisibleForTesting @@ -463,21 +467,23 @@ public class Scheduler implements Runnable { * */ public void shutdown() { - if (shutdown) { - log.warn("Shutdown requested a second time."); - return; + synchronized (lock) { + if (shutdown) { + log.warn("Shutdown requested a second time."); + return; + } + log.info("Worker shutdown requested."); + + // Set shutdown flag, so Worker.run can start shutdown process. + shutdown = true; + shutdownStartTimeMillis = System.currentTimeMillis(); + + // Stop lease coordinator, so leases are not renewed or stolen from other workers. + // Lost leases will force Worker to begin shutdown process for all shard consumers in + // Worker.run(). + leaseCoordinator.stop(); + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } - log.info("Worker shutdown requested."); - - // Set shutdown flag, so Worker.run can start shutdown process. - shutdown = true; - shutdownStartTimeMillis = System.currentTimeMillis(); - - // Stop lease coordinator, so leases are not renewed or stolen from other workers. - // Lost leases will force Worker to begin shutdown process for all shard consumers in - // Worker.run(). - leaseCoordinator.stop(); - workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } /**