refactor(Worker): make Fivetran specific code configurable
This commit is contained in:
parent
14fb8e2703
commit
1a617249f4
2 changed files with 69 additions and 13 deletions
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -81,6 +83,9 @@ public class Worker implements Runnable {
|
||||||
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
|
||||||
private static final int MAX_RETRIES = 4;
|
private static final int MAX_RETRIES = 4;
|
||||||
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
|
||||||
|
private static final boolean DEFAULT_EXITS_ON_FAILURE = false;
|
||||||
|
private static final boolean DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH = false;
|
||||||
|
private static final Optional<Duration> DEFAULT_TIME_LIMIT = Optional.empty();
|
||||||
|
|
||||||
private WorkerLog wlog = new WorkerLog();
|
private WorkerLog wlog = new WorkerLog();
|
||||||
|
|
||||||
|
|
@ -120,6 +125,12 @@ public class Worker implements Runnable {
|
||||||
|
|
||||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||||
|
|
||||||
|
// fivetran configurables
|
||||||
|
private final boolean exitOnFailure;
|
||||||
|
private final boolean exitWhenShardConsumersFinish;
|
||||||
|
private final Optional<Duration> timeLimit;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to ensure that only one requestedShutdown is in progress at a time.
|
* Used to ensure that only one requestedShutdown is in progress at a time.
|
||||||
*/
|
*/
|
||||||
|
|
@ -348,7 +359,10 @@ public class Worker implements Runnable {
|
||||||
config.getShardPrioritizationStrategy(),
|
config.getShardPrioritizationStrategy(),
|
||||||
config.getRetryGetRecordsInSeconds(),
|
config.getRetryGetRecordsInSeconds(),
|
||||||
config.getMaxGetRecordsThreadPool(),
|
config.getMaxGetRecordsThreadPool(),
|
||||||
DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
||||||
|
DEFAULT_EXITS_ON_FAILURE,
|
||||||
|
DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH,
|
||||||
|
DEFAULT_TIME_LIMIT);
|
||||||
|
|
||||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||||
if (config.getRegionName() != null) {
|
if (config.getRegionName() != null) {
|
||||||
|
|
@ -395,7 +409,8 @@ public class Worker implements Runnable {
|
||||||
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
|
||||||
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
|
||||||
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||||
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
|
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER,
|
||||||
|
DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, DEFAULT_TIME_LIMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -426,7 +441,8 @@ public class Worker implements Runnable {
|
||||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
|
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener,
|
||||||
|
boolean exitOnFailure, boolean exitWhenShardConsumersFinish, Optional<Duration> timeLimit) {
|
||||||
this.applicationName = applicationName;
|
this.applicationName = applicationName;
|
||||||
this.recordProcessorFactory = recordProcessorFactory;
|
this.recordProcessorFactory = recordProcessorFactory;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
@ -449,6 +465,9 @@ public class Worker implements Runnable {
|
||||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||||
this.workerStateChangeListener = workerStateChangeListener;
|
this.workerStateChangeListener = workerStateChangeListener;
|
||||||
|
this.exitOnFailure = exitOnFailure;
|
||||||
|
this.exitWhenShardConsumersFinish = exitWhenShardConsumersFinish;
|
||||||
|
this.timeLimit = timeLimit;
|
||||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -469,11 +488,14 @@ public class Worker implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Start consuming data from the stream, and pass it to the application record processors.
|
* Start consuming data from the stream, and pass it to the application record processors.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (shutdown) {
|
if (shutdown) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Instant start = Instant.now();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
initialize();
|
initialize();
|
||||||
LOG.info("Initialization complete. Starting worker loop.");
|
LOG.info("Initialization complete. Starting worker loop.");
|
||||||
|
|
@ -485,7 +507,7 @@ public class Worker implements Runnable {
|
||||||
do {
|
do {
|
||||||
// use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown()
|
// use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown()
|
||||||
runProcessLoop();
|
runProcessLoop();
|
||||||
} while (!shouldShutdown());
|
} while (!shouldShutdown(start));
|
||||||
|
|
||||||
finalShutdown();
|
finalShutdown();
|
||||||
LOG.info("Worker loop is complete. Exiting from worker.");
|
LOG.info("Worker loop is complete. Exiting from worker.");
|
||||||
|
|
@ -516,7 +538,7 @@ public class Worker implements Runnable {
|
||||||
wlog.info("Sleeping ...");
|
wlog.info("Sleeping ...");
|
||||||
Thread.sleep(idleTimeInMilliseconds);
|
Thread.sleep(idleTimeInMilliseconds);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (retries.getAndIncrement() > MAX_RETRIES)
|
if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES)
|
||||||
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
|
||||||
|
|
||||||
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
|
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
|
||||||
|
|
@ -855,9 +877,10 @@ public class Worker implements Runnable {
|
||||||
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
|
||||||
*
|
*
|
||||||
* @return Whether worker should shutdown immediately.
|
* @return Whether worker should shutdown immediately.
|
||||||
|
* @param start
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean shouldShutdown() {
|
boolean shouldShutdown(Instant start) {
|
||||||
if (executorService.isShutdown()) {
|
if (executorService.isShutdown()) {
|
||||||
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
|
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -873,9 +896,20 @@ public class Worker implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shardInfoShardConsumerMap.isEmpty()) {
|
return fivetranWantsShutdown(start);
|
||||||
LOG.info("Nothing to consume");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean fivetranWantsShutdown(Instant start) {
|
||||||
|
if (exitWhenShardConsumersFinish && shardInfoShardConsumerMap.isEmpty()) {
|
||||||
|
LOG.info("All ShardConsumers have finished");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) {
|
||||||
|
LOG.info("Time limit of " + timeLimit.get() + " reached");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1070,6 +1104,11 @@ public class Worker implements Runnable {
|
||||||
private IKinesisProxy kinesisProxy;
|
private IKinesisProxy kinesisProxy;
|
||||||
private WorkerStateChangeListener workerStateChangeListener;
|
private WorkerStateChangeListener workerStateChangeListener;
|
||||||
|
|
||||||
|
// fivetran additions
|
||||||
|
boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE;
|
||||||
|
boolean exitWhenShardConsumersFinish = DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH;
|
||||||
|
Optional<Duration> timeLimit = DEFAULT_TIME_LIMIT;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
AmazonKinesis getKinesisClient() {
|
AmazonKinesis getKinesisClient() {
|
||||||
return kinesisClient;
|
return kinesisClient;
|
||||||
|
|
@ -1213,7 +1252,10 @@ public class Worker implements Runnable {
|
||||||
shardPrioritization,
|
shardPrioritization,
|
||||||
config.getRetryGetRecordsInSeconds(),
|
config.getRetryGetRecordsInSeconds(),
|
||||||
config.getMaxGetRecordsThreadPool(),
|
config.getMaxGetRecordsThreadPool(),
|
||||||
workerStateChangeListener);
|
workerStateChangeListener,
|
||||||
|
exitOnFailure,
|
||||||
|
exitWhenShardConsumersFinish,
|
||||||
|
timeLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
|
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,
|
||||||
|
|
@ -1290,5 +1332,20 @@ public class Worker implements Runnable {
|
||||||
this.workerStateChangeListener = workerStateChangeListener;
|
this.workerStateChangeListener = workerStateChangeListener;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder exitOnFailure() {
|
||||||
|
this.exitOnFailure = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder exitWhenShardConsumersFinish() {
|
||||||
|
this.exitWhenShardConsumersFinish = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder timeLimit(Duration timeLimit) {
|
||||||
|
this.timeLimit = Optional.ofNullable(timeLimit);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import java.io.File;
|
||||||
import java.lang.Thread.State;
|
import java.lang.Thread.State;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
@ -60,7 +61,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.hamcrest.Condition;
|
import org.hamcrest.Condition;
|
||||||
|
|
@ -68,7 +68,6 @@ import org.hamcrest.Description;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
import org.hamcrest.TypeSafeDiagnosingMatcher;
|
import org.hamcrest.TypeSafeDiagnosingMatcher;
|
||||||
import org.hamcrest.TypeSafeMatcher;
|
import org.hamcrest.TypeSafeMatcher;
|
||||||
import org.hamcrest.internal.ReflectiveTypeFinder;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -1083,7 +1082,7 @@ public class WorkerTest {
|
||||||
verify(executorService, never()).submit(argThat(
|
verify(executorService, never()).submit(argThat(
|
||||||
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
|
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
|
||||||
|
|
||||||
assertThat(worker.shouldShutdown(), equalTo(true));
|
assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1314,7 +1313,7 @@ public class WorkerTest {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
assertThat(worker.shouldShutdown(), equalTo(true));
|
assertThat(worker.shouldShutdown(Instant.now()), equalTo(true));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue