refactor(Worker): exit after initial leases complete
This commit is contained in:
parent
08c45f0e42
commit
327f0722f5
2 changed files with 57 additions and 39 deletions
|
|
@ -16,11 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Collection;
|
import java.util.*;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
@ -36,6 +32,9 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||||
|
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -84,7 +83,7 @@ public class Worker implements Runnable {
|
||||||
private static final int MAX_RETRIES = 4;
|
private static final int MAX_RETRIES = 4;
|
||||||
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
||||||
private static final boolean DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH = false;
|
private static final boolean DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE = false;
|
||||||
private static final Optional<Duration> DEFAULT_TIME_LIMIT = Optional.empty();
|
private static final Optional<Duration> DEFAULT_TIME_LIMIT = Optional.empty();
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
@ -127,7 +126,7 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
// fivetran configurables
|
// fivetran configurables
|
||||||
private final boolean exitOnFailure;
|
private final boolean exitOnFailure;
|
||||||
private final boolean exitWhenShardConsumersFinish;
|
private final boolean exitAfterInitialLeasesComplete;
|
||||||
private final Optional<Duration> timeLimit;
|
private final Optional<Duration> timeLimit;
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -361,7 +360,7 @@ public class Worker implements Runnable {
|
||||||
config.getMaxGetRecordsThreadPool(),
|
config.getMaxGetRecordsThreadPool(),
|
||||||
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
||||||
DEFAULT_EXITS_ON_FAILURE,
|
DEFAULT_EXITS_ON_FAILURE,
|
||||||
DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH,
|
DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE,
|
||||||
DEFAULT_TIME_LIMIT);
|
DEFAULT_TIME_LIMIT);
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
|
|
@ -410,7 +409,7 @@ public class Worker implements Runnable {
|
||||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
||||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
||||||
DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, DEFAULT_TIME_LIMIT);
|
DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, DEFAULT_TIME_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -442,7 +441,7 @@ public class Worker implements Runnable {
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
|
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
|
||||||
boolean exitOnFailure, boolean exitWhenShardConsumersFinish, Optional<Duration> timeLimit) {
|
boolean exitOnFailure, boolean exitAfterInitialLeasesComplete, Optional<Duration> timeLimit) {
|
||||||
this.applicationName = applicationName;
|
this.applicationName = applicationName;
|
||||||
this.recordProcessorFactory = recordProcessorFactory;
|
this.recordProcessorFactory = recordProcessorFactory;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
@ -466,7 +465,7 @@ public class Worker implements Runnable {
|
||||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||||
this.workerStateChangeListener = workerStateChangeListener;
|
this.workerStateChangeListener = workerStateChangeListener;
|
||||||
this.exitOnFailure = exitOnFailure;
|
this.exitOnFailure = exitOnFailure;
|
||||||
this.exitWhenShardConsumersFinish = exitWhenShardConsumersFinish;
|
this.exitAfterInitialLeasesComplete = exitAfterInitialLeasesComplete;
|
||||||
this.timeLimit = timeLimit;
|
this.timeLimit = timeLimit;
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
||||||
}
|
}
|
||||||
|
|
@ -494,8 +493,6 @@ public class Worker implements Runnable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Instant start = Instant.now();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
initialize();
|
initialize();
|
||||||
LOG.info("Initialization complete. Starting worker loop.");
|
LOG.info("Initialization complete. Starting worker loop.");
|
||||||
|
|
@ -504,15 +501,52 @@ public class Worker implements Runnable {
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
Instant start = Instant.now();
|
||||||
// use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown()
|
List<KinesisClientLease> initialLeases = getExistingLeases();
|
||||||
|
|
||||||
|
while (!shouldShutdown()) {
|
||||||
runProcessLoop();
|
runProcessLoop();
|
||||||
} while (!shouldShutdown(start));
|
shutdownIfFivetranReady(start, initialLeases);
|
||||||
|
}
|
||||||
|
|
||||||
finalShutdown();
|
finalShutdown();
|
||||||
LOG.info("Worker loop is complete. Exiting from worker.");
|
LOG.info("Worker loop is complete. Exiting from worker.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<KinesisClientLease> getExistingLeases() {
|
||||||
|
try {
|
||||||
|
return leaseCoordinator.getLeaseManager().listLeases();
|
||||||
|
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
|
||||||
|
throw new RuntimeException("Unable to list leases to determine when to stop worker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdownIfFivetranReady(Instant start, List<KinesisClientLease> initialLeases) {
|
||||||
|
boolean initiateShutdown = false;
|
||||||
|
|
||||||
|
List<KinesisClientLease> currentLeases = getExistingLeases();
|
||||||
|
boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains);
|
||||||
|
|
||||||
|
if (exitAfterInitialLeasesComplete && initialLeasesComplete) {
|
||||||
|
LOG.info("Requesting shutdown after finishing with all initial leases: " + initialLeases);
|
||||||
|
initiateShutdown = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) {
|
||||||
|
LOG.info("Requesting shutdown after time limit of " + timeLimit.get() + " reached");
|
||||||
|
initiateShutdown = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (initiateShutdown) {
|
||||||
|
try {
|
||||||
|
startGracefulShutdown().get(5, TimeUnit.MINUTES);
|
||||||
|
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||||
|
LOG.error("Unable to gracefully shutdown: " + e.getMessage());
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void runProcessLoop() {
|
void runProcessLoop() {
|
||||||
try {
|
try {
|
||||||
|
|
@ -877,10 +911,9 @@ public class Worker implements Runnable {
|
||||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||||
*
|
*
|
||||||
* @return Whether worker should shutdown immediately.
|
* @return Whether worker should shutdown immediately.
|
||||||
* @param start
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean shouldShutdown(Instant start) {
|
boolean shouldShutdown() {
|
||||||
if (executorService.isShutdown()) {
|
if (executorService.isShutdown()) {
|
||||||
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
|
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -895,21 +928,6 @@ public class Worker implements Runnable {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fivetranWantsShutdown(start);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean fivetranWantsShutdown(Instant start) {
|
|
||||||
if (exitWhenShardConsumersFinish && shardInfoShardConsumerMap.isEmpty()) {
|
|
||||||
LOG.info("All ShardConsumers have finished");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) {
|
|
||||||
LOG.info("Time limit of " + timeLimit.get() + " reached");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1106,7 +1124,7 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
// fivetran additions
|
// fivetran additions
|
||||||
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
|
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
|
||||||
boolean exitWhenShardConsumersFinish = DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH;
|
boolean exitAfterInitialLeasesComplete = DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE;
|
||||||
Optional<Duration> timeLimit = DEFAULT_TIME_LIMIT;
|
Optional<Duration> timeLimit = DEFAULT_TIME_LIMIT;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -1254,7 +1272,7 @@ public class Worker implements Runnable {
|
||||||
config.getMaxGetRecordsThreadPool(),
|
config.getMaxGetRecordsThreadPool(),
|
||||||
workerStateChangeListener,
|
workerStateChangeListener,
|
||||||
exitOnFailure,
|
exitOnFailure,
|
||||||
exitWhenShardConsumersFinish,
|
exitAfterInitialLeasesComplete,
|
||||||
timeLimit);
|
timeLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1338,8 +1356,8 @@ public class Worker implements Runnable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder exitWhenShardConsumersFinish() {
|
public Builder exitAfterInitialLeasesComplete() {
|
||||||
this.exitWhenShardConsumersFinish = true;
|
this.exitAfterInitialLeasesComplete = true;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1082,7 +1082,7 @@ public class WorkerTest {
|
||||||
verify(executorService, never()).submit(argThat(
|
verify(executorService, never()).submit(argThat(
|
||||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
|
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
|
||||||
|
|
||||||
assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
|
assertThat(worker.shouldShutdown(), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1313,7 +1313,7 @@ public class WorkerTest {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
|
assertThat(worker.shouldShutdown(), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue