From 14fb8e27031791d27410d21d7950a502327ff245 Mon Sep 17 00:00:00 2001 From: glarwood Date: Tue, 5 Mar 2019 20:34:58 -0800 Subject: [PATCH 1/2] refactor(Worker): shutdown when shardInfoShardConsumerMap is empty --- .../kinesis/clientlibrary/lib/worker/Worker.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 71198dff..30146d89 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -482,9 +482,10 @@ public class Worker implements Runnable { shutdown(); } - while (!shouldShutdown()) { + do { + // use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown() runProcessLoop(); - } + } while (!shouldShutdown()); finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); @@ -526,6 +527,9 @@ public class Worker implements Runnable { LOG.info("Worker: sleep interrupted after catching exception ", ex); } } + + // reset retries + retries.set(0); wlog.resetInfoLogging(); } @@ -868,6 +872,10 @@ public class Worker implements Runnable { return true; } } + + if (shardInfoShardConsumerMap.isEmpty()) { + LOG.info("Nothing to consume"); + } return false; } From 1a617249f47129d7b74a3e380c8446ae55dd7b37 Mon Sep 17 00:00:00 2001 From: glarwood Date: Tue, 5 Mar 2019 21:04:52 -0800 Subject: [PATCH 2/2] refactor(Worker): make Fivetran specific code configurable --- .../clientlibrary/lib/worker/Worker.java | 75 ++++++++++++++++--- .../clientlibrary/lib/worker/WorkerTest.java | 7 +- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 30146d89..63197d2f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -81,6 +83,9 @@ public class Worker implements Runnable { private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int MAX_RETRIES = 4; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); + private static final boolean DEFAULT_EXITS_ON_FAILURE = false; + private static final boolean DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH = false; + private static final Optional DEFAULT_TIME_LIMIT = Optional.empty(); private WorkerLog wlog = new WorkerLog(); @@ -120,6 +125,12 @@ public class Worker implements Runnable { private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + // fivetran configurables + private final boolean exitOnFailure; + private final boolean exitWhenShardConsumersFinish; + private final Optional timeLimit; + + /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -348,7 +359,10 @@ public class Worker implements Runnable { config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - DEFAULT_WORKER_STATE_CHANGE_LISTENER); + DEFAULT_WORKER_STATE_CHANGE_LISTENER, + DEFAULT_EXITS_ON_FAILURE, + DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, + DEFAULT_TIME_LIMIT); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -395,7 +409,8 @@ public class Worker implements Runnable { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER); + shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, + DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH, DEFAULT_TIME_LIMIT); } /** @@ -426,7 +441,8 @@ public class Worker implements Runnable { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) { + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, + boolean exitOnFailure, boolean exitWhenShardConsumersFinish, Optional timeLimit) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -449,6 +465,9 @@ public class Worker implements Runnable { this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; + this.exitOnFailure = exitOnFailure; + this.exitWhenShardConsumersFinish = exitWhenShardConsumersFinish; + this.timeLimit = timeLimit; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } @@ -469,11 +488,14 @@ public class Worker implements Runnable { /** * Start consuming data from the stream, and pass it to the application record processors. */ + @Override public void run() { if (shutdown) { return; } + Instant start = Instant.now(); + try { initialize(); LOG.info("Initialization complete. Starting worker loop."); @@ -485,7 +507,7 @@ public class Worker implements Runnable { do { // use do while to initialize the shardInfoShardConsumerMap, which is checked in shouldShutdown() runProcessLoop(); - } while (!shouldShutdown()); + } while (!shouldShutdown(start)); finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); @@ -516,7 +538,7 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { - if (retries.getAndIncrement() > MAX_RETRIES) + if (exitOnFailure && retries.getAndIncrement() > MAX_RETRIES) throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e); LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", @@ -855,9 +877,10 @@ public class Worker implements Runnable { * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. * * @return Whether worker should shutdown immediately. + * @param start */ @VisibleForTesting - boolean shouldShutdown() { + boolean shouldShutdown(Instant start) { if (executorService.isShutdown()) { LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown."); return true; @@ -873,9 +896,20 @@ public class Worker implements Runnable { } } - if (shardInfoShardConsumerMap.isEmpty()) { - LOG.info("Nothing to consume"); + return fivetranWantsShutdown(start); + } + + boolean fivetranWantsShutdown(Instant start) { + if (exitWhenShardConsumersFinish && shardInfoShardConsumerMap.isEmpty()) { + LOG.info("All ShardConsumers have finished"); + return true; } + + if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) { + LOG.info("Time limit of " + timeLimit.get() + " reached"); + return true; + } + return false; } @@ -1070,6 +1104,11 @@ public class Worker implements Runnable { private IKinesisProxy kinesisProxy; private WorkerStateChangeListener workerStateChangeListener; + // fivetran additions + boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE; + boolean exitWhenShardConsumersFinish = DEFAULT_EXITS_WHEN_SHARD_CONSUMERS_FINISH; + Optional timeLimit = DEFAULT_TIME_LIMIT; + @VisibleForTesting AmazonKinesis getKinesisClient() { return kinesisClient; @@ -1213,7 +1252,10 @@ public class Worker implements Runnable { shardPrioritization, config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), - workerStateChangeListener); + workerStateChangeListener, + exitOnFailure, + exitWhenShardConsumersFinish, + timeLimit); } > R createClient(final T builder, @@ -1290,5 +1332,20 @@ public class Worker implements Runnable { this.workerStateChangeListener = workerStateChangeListener; return this; } + + public Builder exitOnFailure() { + this.exitOnFailure = true; + return this; + } + + public Builder exitWhenShardConsumersFinish() { + this.exitWhenShardConsumersFinish = true; + return this; + } + + public Builder timeLimit(Duration timeLimit) { + this.timeLimit = Optional.ofNullable(timeLimit); + return this; + } } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 98c406ed..5106baa8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -38,6 +38,7 @@ import java.io.File; import java.lang.Thread.State; import java.lang.reflect.Field; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -60,7 +61,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.auth.AWSCredentialsProvider; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Condition; @@ -68,7 +68,6 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; -import org.hamcrest.internal.ReflectiveTypeFinder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1083,7 +1082,7 @@ public class WorkerTest { verify(executorService, never()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); - assertThat(worker.shouldShutdown(), equalTo(true)); + assertThat(worker.shouldShutdown(Instant.now()), equalTo(true)); } @@ -1314,7 +1313,7 @@ public class WorkerTest { - assertThat(worker.shouldShutdown(), equalTo(true)); + assertThat(worker.shouldShutdown(Instant.now()), equalTo(true)); }