refactor(Worker): remove exits after initial lease completion code & default time limit

This commit is contained in:
glarwood 2019-03-20 20:07:44 +00:00
parent 0a4fc8bef0
commit 913f2e9377

View file

@ -85,8 +85,6 @@ public class Worker implements Runnable {
private static final int MAX_RETRIES = 4;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
private static final boolean DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE = false;
private static final Optional<Duration> DEFAULT_TIME_LIMIT = Optional.empty();
private WorkerLog wlog = new WorkerLog();
@ -128,8 +126,6 @@ public class Worker implements Runnable {
// fivetran configurables
private final boolean exitOnFailure;
private final boolean exitAfterInitialLeasesComplete;
private final Optional<Duration> timeLimit;
/**
@ -361,9 +357,7 @@ public class Worker implements Runnable {
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_EXITS_ON_FAILURE,
DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE,
DEFAULT_TIME_LIMIT);
DEFAULT_EXITS_ON_FAILURE);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
@ -411,7 +405,7 @@ public class Worker implements Runnable {
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, DEFAULT_TIME_LIMIT);
DEFAULT_EXITS_ON_FAILURE);
}
/**
@ -443,7 +437,7 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
boolean exitOnFailure, boolean exitAfterInitialLeasesComplete, Optional<Duration> timeLimit) {
boolean exitOnFailure) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -467,8 +461,6 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
this.exitOnFailure = exitOnFailure;
this.exitAfterInitialLeasesComplete = exitAfterInitialLeasesComplete;
this.timeLimit = timeLimit;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
}
@ -503,52 +495,14 @@ public class Worker implements Runnable {
shutdown();
}
Instant start = Instant.now();
List<String> initialLeaseKeys = getExistingLeaseKeys();
while (!shouldShutdown()) {
runProcessLoop();
shutdownIfFivetranReady(start, initialLeaseKeys);
}
finalShutdown();
LOG.info("Worker loop is complete. Exiting from worker.");
}
private List<String> getExistingLeaseKeys() {
try {
return leaseCoordinator.getLeaseManager().listLeases().stream().map(Lease::getLeaseKey).collect(Collectors.toList());
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
throw new RuntimeException("Unable to list leases to determine when to stop worker");
}
}
void shutdownIfFivetranReady(Instant start, List<String> initialLeases) {
boolean initiateShutdown = false;
List<String> currentLeases = getExistingLeaseKeys();
boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains);
if (exitAfterInitialLeasesComplete && initialLeasesComplete) {
LOG.info("Requesting shutdown after finishing with all initial leases: " + initialLeases);
initiateShutdown = true;
}
if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) {
LOG.info("Requesting shutdown after time limit of " + timeLimit.get() + " reached");
initiateShutdown = true;
}
if (initiateShutdown) {
try {
startGracefulShutdown().get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Unable to gracefully shutdown: " + e.getMessage());
shutdown();
}
}
}
@VisibleForTesting
void runProcessLoop() {
try {
@ -1126,8 +1080,6 @@ public class Worker implements Runnable {
// fivetran additions
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
boolean exitAfterInitialLeasesComplete = DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE;
Optional<Duration> timeLimit = DEFAULT_TIME_LIMIT;
@VisibleForTesting
AmazonKinesis getKinesisClient() {
@ -1273,9 +1225,7 @@ public class Worker implements Runnable {
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener,
exitOnFailure,
exitAfterInitialLeasesComplete,
timeLimit);
exitOnFailure);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
@ -1357,15 +1307,5 @@ public class Worker implements Runnable {
this.exitOnFailure = true;
return this;
}
public Builder exitAfterInitialLeasesComplete() {
this.exitAfterInitialLeasesComplete = true;
return this;
}
public Builder timeLimit(Duration timeLimit) {
this.timeLimit = Optional.ofNullable(timeLimit);
return this;
}
}
}