Merge pull request #1 from fivetran/exit-when-shard-complete

refactor(Worker): shutdown when shardInfoShardConsumerMap is empty
This commit is contained in:
Gabriel Larwood 2019-03-05 21:06:54 -08:00 committed by GitHub
commit 08c45f0e42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 12 deletions

View file

@ -14,6 +14,8 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -81,6 +83,9 @@ public class Worker implements Runnable {
private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int MAX_RETRIES = 4; private static final int MAX_RETRIES = 4;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
private static final boolean DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH = false;
private static final Optional<Duration> DEFAULT_TIME_LIMIT = Optional.empty();
private WorkerLog wlog = new WorkerLog(); private WorkerLog wlog = new WorkerLog();
@ -120,6 +125,12 @@ public class Worker implements Runnable {
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
// fivetran configurables
private final boolean exitOnFailure;
private final boolean exitWhenShardConsumersFinish;
private final Optional<Duration> timeLimit;
/** /**
* Used to ensure that only one requestedShutdown is in progress at a time. * Used to ensure that only one requestedShutdown is in progress at a time.
*/ */
@ -348,7 +359,10 @@ public class Worker implements Runnable {
config.getShardPrioritizationStrategy(), config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(), config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER); DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_EXITS_ON_FAILURE,
DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH,
DEFAULT_TIME_LIMIT);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
@ -395,7 +409,8 @@ public class Worker implements Runnable {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER); shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, DEFAULT_TIME_LIMIT);
} }
/** /**
@ -426,7 +441,8 @@ public class Worker implements Runnable {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) { Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
boolean exitOnFailure, boolean exitWhenShardConsumersFinish, Optional<Duration> timeLimit) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -449,6 +465,9 @@ public class Worker implements Runnable {
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener; this.workerStateChangeListener = workerStateChangeListener;
this.exitOnFailure = exitOnFailure;
this.exitWhenShardConsumersFinish = exitWhenShardConsumersFinish;
this.timeLimit = timeLimit;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
} }
@ -469,11 +488,14 @@ public class Worker implements Runnable {
/** /**
* Start consuming data from the stream, and pass it to the application record processors. * Start consuming data from the stream, and pass it to the application record processors.
*/ */
@Override
public void run() { public void run() {
if (shutdown) { if (shutdown) {
return; return;
} }
Instant start = Instant.now();
try { try {
initialize(); initialize();
LOG.info("Initialization complete. Starting worker loop."); LOG.info("Initialization complete. Starting worker loop.");
@ -482,9 +504,10 @@ public class Worker implements Runnable {
shutdown(); shutdown();
} }
while (!shouldShutdown()) { do {
// use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown()
runProcessLoop(); runProcessLoop();
} } while (!shouldShutdown(start));
finalShutdown(); finalShutdown();
LOG.info("Worker loop is complete. Exiting from worker."); LOG.info("Worker loop is complete. Exiting from worker.");
@ -515,7 +538,7 @@ public class Worker implements Runnable {
wlog.info("Sleeping ..."); wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
} catch (Exception e) { } catch (Exception e) {
if (retries.getAndIncrement() > MAX_RETRIES) if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e); throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
@ -526,6 +549,9 @@ public class Worker implements Runnable {
LOG.info("Worker: sleep interrupted after catching exception ", ex); LOG.info("Worker: sleep interrupted after catching exception ", ex);
} }
} }
// reset retries
retries.set(0);
wlog.resetInfoLogging(); wlog.resetInfoLogging();
} }
@ -851,9 +877,10 @@ public class Worker implements Runnable {
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings. * method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
* *
* @return Whether worker should shutdown immediately. * @return Whether worker should shutdown immediately.
* @param start
*/ */
@VisibleForTesting @VisibleForTesting
boolean shouldShutdown() { boolean shouldShutdown(Instant start) {
if (executorService.isShutdown()) { if (executorService.isShutdown()) {
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown."); LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
return true; return true;
@ -868,6 +895,21 @@ public class Worker implements Runnable {
return true; return true;
} }
} }
return fivetranWantsShutdown(start);
}
boolean fivetranWantsShutdown(Instant start) {
if (exitWhenShardConsumersFinish && shardInfoShardConsumerMap.isEmpty()) {
LOG.info("All ShardConsumers have finished");
return true;
}
if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) {
LOG.info("Time limit of " + timeLimit.get() + " reached");
return true;
}
return false; return false;
} }
@ -1062,6 +1104,11 @@ public class Worker implements Runnable {
private IKinesisProxy kinesisProxy; private IKinesisProxy kinesisProxy;
private WorkerStateChangeListener workerStateChangeListener; private WorkerStateChangeListener workerStateChangeListener;
// fivetran additions
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
boolean exitWhenShardConsumersFinish = DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH;
Optional<Duration> timeLimit = DEFAULT_TIME_LIMIT;
@VisibleForTesting @VisibleForTesting
AmazonKinesis getKinesisClient() { AmazonKinesis getKinesisClient() {
return kinesisClient; return kinesisClient;
@ -1205,7 +1252,10 @@ public class Worker implements Runnable {
shardPrioritization, shardPrioritization,
config.getRetryGetRecordsInSeconds(), config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool(), config.getMaxGetRecordsThreadPool(),
workerStateChangeListener); workerStateChangeListener,
exitOnFailure,
exitWhenShardConsumersFinish,
timeLimit);
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder, <R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
@ -1282,5 +1332,20 @@ public class Worker implements Runnable {
this.workerStateChangeListener = workerStateChangeListener; this.workerStateChangeListener = workerStateChangeListener;
return this; return this;
} }
public Builder exitOnFailure() {
this.exitOnFailure = true;
return this;
}
public Builder exitWhenShardConsumersFinish() {
this.exitWhenShardConsumersFinish = true;
return this;
}
public Builder timeLimit(Duration timeLimit) {
this.timeLimit = Optional.ofNullable(timeLimit);
return this;
}
} }
} }

View file

@ -38,6 +38,7 @@ import java.io.File;
import java.lang.Thread.State; import java.lang.Thread.State;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.math.BigInteger; import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -60,7 +61,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition; import org.hamcrest.Condition;
@ -68,7 +68,6 @@ import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
import org.hamcrest.internal.ReflectiveTypeFinder;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -1083,7 +1082,7 @@ public class WorkerTest {
verify(executorService, never()).submit(argThat( verify(executorService, never()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
assertThat(worker.shouldShutdown(), equalTo(true)); assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
} }
@ -1314,7 +1313,7 @@ public class WorkerTest {
assertThat(worker.shouldShutdown(), equalTo(true)); assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
} }