From 327f0722f50fc3c38d05f0f523e271389e8adc16 Mon Sep 17 00:00:00 2001 From: glarwood Date: Wed, 20 Mar 2019 15:59:31 +0000 Subject: [PATCH] refactor(Worker): exit after initial leases complete --- .../clientlibrary/lib/worker/Worker.java | 92 +++++++++++-------- .../clientlibrary/lib/worker/WorkerTest.java | 4 +- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 63197d2f..4e90853b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -16,11 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.time.Duration; import java.time.Instant; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -36,6 +32,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -84,7 +83,7 @@ public class Worker implements Runnable { private static final int MAX_RETRIES = 4; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final boolean DEFAULT_EXITS_ON_FAILURE = false; - private static final boolean DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH = false; + private static final boolean DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE = false; private static final Optional DEFAULT_TIME_LIMIT = Optional.empty(); private WorkerLog wlog = new WorkerLog(); @@ -127,7 +126,7 @@ public class Worker implements Runnable { // fivetran configurables private final boolean exitOnFailure; - private final boolean exitWhenShardConsumersFinish; + private final boolean exitAfterInitialLeasesComplete; private final Optional timeLimit; @@ -361,7 +360,7 @@ public class Worker implements Runnable { config.getMaxGetRecordsThreadPool(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, DEFAULT_EXITS_ON_FAILURE, - DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, + DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, DEFAULT_TIME_LIMIT); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. @@ -410,7 +409,7 @@ public class Worker implements Runnable { shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, DEFAULT_TIME_LIMIT); + DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, DEFAULT_TIME_LIMIT); } /** @@ -442,7 +441,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - boolean exitOnFailure, boolean exitWhenShardConsumersFinish, Optional timeLimit) { + boolean exitOnFailure, boolean exitAfterInitialLeasesComplete, Optional timeLimit) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -466,7 +465,7 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; this.exitOnFailure = exitOnFailure; - this.exitWhenShardConsumersFinish = exitWhenShardConsumersFinish; + this.exitAfterInitialLeasesComplete = exitAfterInitialLeasesComplete; this.timeLimit = timeLimit; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } @@ -494,8 +493,6 @@ public class Worker implements Runnable { return; } - Instant start = Instant.now(); - try { initialize(); LOG.info("Initialization complete. Starting worker loop."); @@ -504,15 +501,52 @@ public class Worker implements Runnable { shutdown(); } - do { - // use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown() + Instant start = Instant.now(); + List initialLeases = getExistingLeases(); + + while (!shouldShutdown()) { runProcessLoop(); - } while (!shouldShutdown(start)); + shutdownIfFivetranReady(start, initialLeases); + } finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); } + private List getExistingLeases() { + try { + return leaseCoordinator.getLeaseManager().listLeases(); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { + throw new RuntimeException("Unable to list leases to determine when to stop worker"); + } + } + + void shutdownIfFivetranReady(Instant start, List initialLeases) { + boolean initiateShutdown = false; + + List currentLeases = getExistingLeases(); + boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains); + + if (exitAfterInitialLeasesComplete && initialLeasesComplete) { + LOG.info("Requesting shutdown after finishing with all initial leases: " + initialLeases); + initiateShutdown = true; + } + + if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) { + LOG.info("Requesting shutdown after time limit of " + timeLimit.get() + " reached"); + initiateShutdown = true; + } + + if (initiateShutdown) { + try { + startGracefulShutdown().get(5, TimeUnit.MINUTES); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Unable to gracefully shutdown: " + e.getMessage()); + shutdown(); + } + } + } + @VisibleForTesting void runProcessLoop() { try { @@ -877,10 +911,9 @@ public class Worker implements Runnable { * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. * * @return Whether worker should shutdown immediately. - * @param start */ @VisibleForTesting - boolean shouldShutdown(Instant start) { + boolean shouldShutdown() { if (executorService.isShutdown()) { LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown."); return true; @@ -895,21 +928,6 @@ public class Worker implements Runnable { return true; } } - - return fivetranWantsShutdown(start); - } - - boolean fivetranWantsShutdown(Instant start) { - if (exitWhenShardConsumersFinish && shardInfoShardConsumerMap.isEmpty()) { - LOG.info("All ShardConsumers have finished"); - return true; - } - - if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) { - LOG.info("Time limit of " + timeLimit.get() + " reached"); - return true; - } - return false; } @@ -1106,7 +1124,7 @@ public class Worker implements Runnable { // fivetran additions boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE; - boolean exitWhenShardConsumersFinish = DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH; + boolean exitAfterInitialLeasesComplete = DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE; Optional timeLimit = DEFAULT_TIME_LIMIT; @VisibleForTesting @@ -1254,7 +1272,7 @@ public class Worker implements Runnable { config.getMaxGetRecordsThreadPool(), workerStateChangeListener, exitOnFailure, - exitWhenShardConsumersFinish, + exitAfterInitialLeasesComplete, timeLimit); } @@ -1338,8 +1356,8 @@ public class Worker implements Runnable { return this; } - public Builder exitWhenShardConsumersFinish() { - this.exitWhenShardConsumersFinish = true; + public Builder exitAfterInitialLeasesComplete() { + this.exitAfterInitialLeasesComplete = true; return this; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5106baa8..ac081897 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1082,7 +1082,7 @@ public class WorkerTest { verify(executorService, never()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); - assertThat(worker.shouldShutdown(Instant.now()), equalTo(true)); + assertThat(worker.shouldShutdown(), equalTo(true)); } @@ -1313,7 +1313,7 @@ public class WorkerTest { - assertThat(worker.shouldShutdown(Instant.now()), equalTo(true)); + assertThat(worker.shouldShutdown(), equalTo(true)); }