diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 90b6c4de..bcca1a42 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -85,8 +85,6 @@ public class Worker implements Runnable { private static final int MAX_RETRIES = 4; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final boolean DEFAULT_EXITS_ON_FAILURE = false; - private static final boolean DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE = false; - private static final Optional DEFAULT_TIME_LIMIT = Optional.empty(); private WorkerLog wlog = new WorkerLog(); @@ -128,8 +126,6 @@ public class Worker implements Runnable { // fivetran configurables private final boolean exitOnFailure; - private final boolean exitAfterInitialLeasesComplete; - private final Optional timeLimit; /** @@ -361,9 +357,7 @@ public class Worker implements Runnable { config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_EXITS_ON_FAILURE, - DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, - DEFAULT_TIME_LIMIT); + DEFAULT_EXITS_ON_FAILURE); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -411,7 +405,7 @@ public class Worker implements Runnable { shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER, - DEFAULT_EXITS_ON_FAILURE, DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE, DEFAULT_TIME_LIMIT); + DEFAULT_EXITS_ON_FAILURE); } /** @@ -443,7 +437,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - boolean exitOnFailure, boolean exitAfterInitialLeasesComplete, Optional timeLimit) { + boolean exitOnFailure) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -467,8 +461,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.workerStateChangeListener = workerStateChangeListener; this.exitOnFailure = exitOnFailure; - this.exitAfterInitialLeasesComplete = exitAfterInitialLeasesComplete; - this.timeLimit = timeLimit; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } @@ -503,52 +495,14 @@ public class Worker implements Runnable { shutdown(); } - Instant start = Instant.now(); - List initialLeaseKeys = getExistingLeaseKeys(); - while (!shouldShutdown()) { runProcessLoop(); - shutdownIfFivetranReady(start, initialLeaseKeys); } finalShutdown(); LOG.info("Worker loop is complete. Exiting from worker."); } - private List getExistingLeaseKeys() { - try { - return leaseCoordinator.getLeaseManager().listLeases().stream().map(Lease::getLeaseKey).collect(Collectors.toList()); - } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) { - throw new RuntimeException("Unable to list leases to determine when to stop worker"); - } - } - - void shutdownIfFivetranReady(Instant start, List initialLeases) { - boolean initiateShutdown = false; - - List currentLeases = getExistingLeaseKeys(); - boolean initialLeasesComplete = currentLeases.stream().noneMatch(initialLeases::contains); - - if (exitAfterInitialLeasesComplete && initialLeasesComplete) { - LOG.info("Requesting shutdown after finishing with all initial leases: " + initialLeases); - initiateShutdown = true; - } - - if (timeLimit.isPresent() && Duration.between(start, Instant.now()).compareTo(timeLimit.get()) > 0) { - LOG.info("Requesting shutdown after time limit of " + timeLimit.get() + " reached"); - initiateShutdown = true; - } - - if (initiateShutdown) { - try { - startGracefulShutdown().get(5, TimeUnit.MINUTES); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Unable to gracefully shutdown: " + e.getMessage()); - shutdown(); - } - } - } - @VisibleForTesting void runProcessLoop() { try { @@ -1126,8 +1080,6 @@ public class Worker implements Runnable { // fivetran additions boolean exitOnFailure = DEFAULT_EXITS_ON_FAILURE; - boolean exitAfterInitialLeasesComplete = DEFAULT_EXITS_AFTER_INITIAL_LEASES_COMPLETE; - Optional timeLimit = DEFAULT_TIME_LIMIT; @VisibleForTesting AmazonKinesis getKinesisClient() { @@ -1273,9 +1225,7 @@ public class Worker implements Runnable { config.getRetryGetRecordsInSeconds(), config.getMaxGetRecordsThreadPool(), workerStateChangeListener, - exitOnFailure, - exitAfterInitialLeasesComplete, - timeLimit); + exitOnFailure); } > R createClient(final T builder, @@ -1357,15 +1307,5 @@ public class Worker implements Runnable { this.exitOnFailure = true; return this; } - - public Builder exitAfterInitialLeasesComplete() { - this.exitAfterInitialLeasesComplete = true; - return this; - } - - public Builder timeLimit(Duration timeLimit) { - this.timeLimit = Optional.ofNullable(timeLimit); - return this; - } } }