From 082937895735d23abc0ee390324c55287afdb4eb Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Tue, 13 Jun 2017 06:21:27 -0700 Subject: [PATCH] Reworked some of the shutdown logic to make the relationships clearer. --- .../lib/worker/GracefulShutdownContext.java | 19 + ....java => GracefulShutdownCoordinator.java} | 78 +- .../clientlibrary/lib/worker/Worker.java | 750 +++++++++--------- ...a => GracefulShutdownCoordinatorTest.java} | 23 +- .../clientlibrary/lib/worker/WorkerTest.java | 164 +++- 5 files changed, 624 insertions(+), 410 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java rename src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/{RequestedShutdownCoordinator.java => GracefulShutdownCoordinator.java} (58%) rename src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/{RequestedShutdownCoordinatorTest.java => GracefulShutdownCoordinatorTest.java} (92%) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java new file mode 100644 index 00000000..f39a9adb --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownContext.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Data; + +import java.util.concurrent.CountDownLatch; + +@Data +class GracefulShutdownContext { + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + + static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null); + + boolean isShutdownAlreadyCompleted() { + return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null; + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java similarity index 58% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java index 5770c6ee..ce3059fe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinator.java @@ -1,7 +1,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; @@ -9,9 +8,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -class RequestedShutdownCoordinator { +class GracefulShutdownCoordinator { - static Future startRequestedShutdown(Callable shutdownCallable) { + Future startGracefulShutdown(Callable shutdownCallable) { FutureTask task = new FutureTask<>(shutdownCallable); Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); shutdownThread.start(); @@ -19,45 +18,39 @@ class RequestedShutdownCoordinator { } - static Callable createRequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, - CountDownLatch notificationCompleteLatch, Worker worker) { - return new RequestedShutdownCallable(shutdownCompleteLatch, notificationCompleteLatch, worker); + Callable createGracefulShutdownCallable(Callable startWorkerShutdown) { + return new GracefulShutdownCallable(startWorkerShutdown); } - static class RequestedShutdownCallable implements Callable { + static class GracefulShutdownCallable implements Callable { - private static final Log log = LogFactory.getLog(RequestedShutdownCallable.class); + private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class); - private final CountDownLatch shutdownCompleteLatch; - private final CountDownLatch notificationCompleteLatch; - private final Worker worker; + private final Callable startWorkerShutdown; - RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, - Worker worker) { - this.shutdownCompleteLatch = shutdownCompleteLatch; - this.notificationCompleteLatch = notificationCompleteLatch; - this.worker = worker; + GracefulShutdownCallable(Callable startWorkerShutdown) { + this.startWorkerShutdown = startWorkerShutdown; } - private boolean isWorkerShutdownComplete() { - return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); + private boolean isWorkerShutdownComplete(GracefulShutdownContext context) { + return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty(); } - private String awaitingLogMessage() { - long awaitingNotification = notificationCompleteLatch.getCount(); - long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); + private String awaitingLogMessage(GracefulShutdownContext context) { + long awaitingNotification = context.getNotificationCompleteLatch().getCount(); + long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount(); return String.format( "Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ", awaitingNotification, awaitingFinalShutdown); } - private String awaitingFinalShutdownMessage() { - long outstanding = shutdownCompleteLatch.getCount(); + private String awaitingFinalShutdownMessage(GracefulShutdownContext context) { + long outstanding = context.getShutdownCompleteLatch().getCount(); return String.format("Waiting for %d record processors to complete final shutdown", outstanding); } - private boolean waitForRecordProcessors() { + private boolean waitForRecordProcessors(GracefulShutdownContext context) { // // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. @@ -66,18 +59,18 @@ class RequestedShutdownCoordinator { // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. // try { - while (!notificationCompleteLatch.await(1, TimeUnit.SECONDS)) { + while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) { if (Thread.interrupted()) { throw new InterruptedException(); } - log.info(awaitingLogMessage()); - if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) { + log.info(awaitingLogMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { return false; } } } catch (InterruptedException ie) { log.warn("Interrupted while waiting for notification complete, terminating shutdown. " - + awaitingLogMessage()); + + awaitingLogMessage(context)); return false; } @@ -90,7 +83,7 @@ class RequestedShutdownCoordinator { // Once all record processors have been notified of the shutdown it is safe to allow the worker to // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. // - worker.shutdown(); + context.getWorker().shutdown(); if (Thread.interrupted()) { log.warn("Interrupted after worker shutdown, terminating shutdown"); @@ -103,18 +96,18 @@ class RequestedShutdownCoordinator { // ShardConsumer is terminated. // try { - while (!shutdownCompleteLatch.await(1, TimeUnit.SECONDS)) { + while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) { if (Thread.interrupted()) { throw new InterruptedException(); } - log.info(awaitingFinalShutdownMessage()); - if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) { + log.info(awaitingFinalShutdownMessage(context)); + if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) { return false; } } } catch (InterruptedException ie) { log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. " - + awaitingFinalShutdownMessage()); + + awaitingFinalShutdownMessage(context)); return false; } return true; @@ -128,13 +121,13 @@ class RequestedShutdownCoordinator { * @param outstanding * the number of record processor still awaiting shutdown. */ - private boolean workerShutdownWithRemaining(long outstanding) { - if (isWorkerShutdownComplete()) { + private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) { + if (isWorkerShutdownComplete(context)) { if (outstanding != 0) { log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding - + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " - + worker.isShutdownComplete() + " -- Consumer Map: " - + worker.getShardInfoShardConsumerMap().size()); + + " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: " + + context.getWorker().isShutdownComplete() + " -- Consumer Map: " + + context.getWorker().getShardInfoShardConsumerMap().size()); return true; } } @@ -143,7 +136,14 @@ class RequestedShutdownCoordinator { @Override public Boolean call() throws Exception { - return waitForRecordProcessors(); + GracefulShutdownContext context; + try { + context = startWorkerShutdown.call(); + } catch (Exception ex) { + log.warn("Caught exception while requesting initial worker shutdown.", ex); + throw ex; + } + return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 563b4432..14d593e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -1,16 +1,9 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software + * License (the "License"). You may not use this file except in compliance with the License. A copy of the License is + * located at http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -24,7 +17,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -56,14 +48,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * Worker is the high level class that Kinesis applications use to start - * processing data. It initializes and oversees different components (e.g. - * syncing shard and lease information, tracking shard assignments, and - * processing data from the shards). + * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees + * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from + * the shards). */ public class Worker implements Runnable { @@ -81,7 +71,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -100,17 +90,27 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = - new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; - + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + /** + * Used to ensure that only one requestedShutdown is in progress at a time. + */ + private Future gracefulShutdownFuture; + @VisibleForTesting + protected boolean gracefuleShutdownStarted = false; + @VisibleForTesting + protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, @@ -121,136 +121,136 @@ public class Worker implements Runnable { /** * Constructor. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), - config.getCloudWatchClientConfiguration()), execService); + config.getCloudWatchClientConfiguration()), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory) { + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) { this(recordProcessorFactory, config, metricsFactory, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), - config.getKinesisClientConfiguration()), + KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, + new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), - config.getDynamoDBClientConfiguration()), metricsFactory, execService); + config.getDynamoDBClientConfiguration()), + metricsFactory, execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) { this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService()); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - AmazonCloudWatch cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, - getMetricsFactory(cloudWatchClient, config), execService); + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + AmazonCloudWatch cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config), + execService); } /** - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesis kinesisClient, - AmazonDynamoDB dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this( - config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, + IMetricsFactory metricsFactory, ExecutorService execService) { + this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) - .getProxy(config.getStreamName()), + .getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), config.getInitialPositionInStreamExtended()), - config.getInitialPositionInStreamExtended(), - config.getParentShardPollIntervalMillis(), - config.getShardSyncIntervalMillis(), - config.shouldCleanupLeasesUponShardCompletion(), - null, + config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), + config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), - config.getWorkerIdentifier(), - config.getFailoverTimeMillis(), - config.getEpsilonMillis(), - config.getMaxLeasesForWorker(), - config.getMaxLeasesToStealAtOneTime(), - metricsFactory) - .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) - .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), - execService, - metricsFactory, - config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis(), - config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), - config.getShardPrioritizationStrategy()); + config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), + config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory) + .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) + .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), + execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -273,48 +273,50 @@ public class Worker implements Runnable { + ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint() + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } } /** - * @param applicationName Name of the Kinesis application - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param streamConfig Stream configuration - * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start - * fetching data from this location in the stream when an application starts up for the first time and - * there are no checkpoints. If there are checkpoints, we start from the checkpoint position. - * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done - * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards - * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in - * Kinesis) - * @param checkpoint Used to get/set checkpoints - * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) - * @param metricsFactory Metrics factory used to emit metrics - * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception - * @param shardPrioritization Provides prioritization logic to decide which available shards process first + * @param applicationName + * Name of the Kinesis application + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param streamConfig + * Stream configuration + * @param initialPositionInStream + * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis + * Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis + * Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion + * Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint + * Used to get/set checkpoints + * @param leaseCoordinator + * Lease coordinator (coordinates currently owned leases) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory + * Metrics factory used to emit metrics + * @param taskBackoffTimeMillis + * Backoff period when tasks encounter an exception + * @param shardPrioritization + * Provides prioritization logic to decide which available shards process first */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, - IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, - long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, - boolean cleanupLeasesUponShardCompletion, - ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, - ExecutorService execService, - IMetricsFactory metricsFactory, - long taskBackoffTimeMillis, - long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ShardPrioritization shardPrioritization) { + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.streamConfig = streamConfig; @@ -326,16 +328,11 @@ public class Worker implements Runnable { this.executorService = execService; this.leaseCoordinator = leaseCoordinator; this.metricsFactory = metricsFactory; - this.controlServer = - new ShardSyncTaskManager(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPositionInStream, - cleanupLeasesUponShardCompletion, - shardSyncIdleTimeMillis, - metricsFactory, - executorService); + this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, + executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -348,8 +345,7 @@ public class Worker implements Runnable { } /** - * Start consuming data from the stream, and pass it to the application - * record processors. + * Start consuming data from the stream, and pass it to the application record processors. */ public void run() { if (shutdown) { @@ -422,12 +418,8 @@ public class Worker implements Runnable { if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); + ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); @@ -464,14 +456,13 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example - * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. - * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); - * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); + * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo + * shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo + * shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); */ void cleanupShardConsumers(Set assignedShards) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { @@ -511,38 +502,17 @@ public class Worker implements Runnable { } /** - * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, - * of the impending shutdown. This gives the record processor a final chance to checkpoint. - * - * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is - * lost after requesting shutdown, but before the notification is dispatched. - * - *

Requested Shutdown Process

When a shutdown process is requested it operates slightly differently to - * allow the record processors a chance to checkpoint a final time. - *
    - *
  1. Call to request shutdown invoked.
  2. - *
  3. Worker stops attempting to acquire new leases
  4. - *
  5. Record Processor Shutdown Begins - *
      - *
    1. Record processor is notified of the impending shutdown, and given a final chance to checkpoint
    2. - *
    3. The lease for the record processor is then dropped.
    4. - *
    5. The record processor enters into an idle state waiting for the worker to complete final termination
    6. - *
    7. The worker will detect a record processor that has lost it's lease, and will terminate the record processor - * with {@link ShutdownReason#ZOMBIE}
    8. - *
    - *
  6. - *
  7. The worker will shutdown all record processors.
  8. - *
  9. Once all record processors have been terminated, the worker will terminate all owned resources.
  10. - *
  11. Once the worker shutdown is complete, the returned future is completed.
  12. - *
- * - * - * - * @return a Future that will be set once the shutdown is complete. + * Starts the requestedShutdown process, and returns a future that can be used to track the process. + * + * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and + * indicates the process behavior + * + * @return a future that will be set once shutdown is completed. */ + @Deprecated public Future requestShutdown() { - Future requestedShutdownFuture = requestCancellableShutdown(); + Future requestedShutdownFuture = startGracefulShutdown(); return new Future() { @@ -568,52 +538,123 @@ public class Worker implements Runnable { } @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + public Void get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { requestedShutdownFuture.get(timeout, unit); return null; } }; } - public Future requestCancellableShutdown() { - return RequestedShutdownCoordinator.startRequestedShutdown(requestShutdownCallable()); - } - - public Callable requestShutdownCallable() { - // - // Stop accepting new leases. Once we do this we can be sure that - // no more leases will be acquired. - // - leaseCoordinator.stopLeaseTaker(); - - Collection leases = leaseCoordinator.getAssignments(); - if (leases == null || leases.isEmpty()) { - // - // If there are no leases notification is already completed, but we still need to shutdown the worker. - // - this.shutdown(); - return () -> true; - } - CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); - CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); - for (KinesisClientLease lease : leases) { - ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, - notificationCompleteLatch, shutdownCompleteLatch); - ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer != null) { - consumer.notifyShutdownRequested(shutdownNotification); - } else { - // - // There is a race condition between retrieving the current assignments, and creating the - // notification. If the a lease is lost in between these two points, we explicitly decrement the - // notification latches to clear the shutdown. - // - notificationCompleteLatch.countDown(); - shutdownCompleteLatch.countDown(); + /** + * Requests a graceful shutdown of the worker, notifying record processors, that implement + * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to + * checkpoint. + * + * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the + * previous future. + * + * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is + * lost after requesting shutdown, but before the notification is dispatched. + * + *

Requested Shutdown Process

When a shutdown process is requested it operates slightly differently to + * allow the record processors a chance to checkpoint a final time. + *
    + *
  1. Call to request shutdown invoked.
  2. + *
  3. Worker stops attempting to acquire new leases
  4. + *
  5. Record Processor Shutdown Begins + *
      + *
    1. Record processor is notified of the impending shutdown, and given a final chance to checkpoint
    2. + *
    3. The lease for the record processor is then dropped.
    4. + *
    5. The record processor enters into an idle state waiting for the worker to complete final termination
    6. + *
    7. The worker will detect a record processor that has lost it's lease, and will terminate the record processor + * with {@link ShutdownReason#ZOMBIE}
    8. + *
    + *
  6. + *
  7. The worker will shutdown all record processors.
  8. + *
  9. Once all record processors have been terminated, the worker will terminate all owned resources.
  10. + *
  11. Once the worker shutdown is complete, the returned future is completed.
  12. + *
+ * + * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown + * completed successfully. A false value indicates that a non-exception case caused the shutdown process to + * terminate early. + */ + public Future startGracefulShutdown() { + synchronized (this) { + if (gracefulShutdownFuture == null) { + gracefulShutdownFuture = gracefulShutdownCoordinator + .startGracefulShutdown(createGracefulShutdownCallable()); } } - return RequestedShutdownCoordinator.createRequestedShutdownCallable(shutdownCompleteLatch, notificationCompleteLatch, this); + return gracefulShutdownFuture; + } + + /** + * Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful + * shutdowns in your own executor, or execute the shutdown synchronously. + * + * @return a callable that run the graceful shutdown process. This may return a callable that return true if the + * graceful shutdown has already been completed. + * @throws IllegalStateException + * thrown by the callable if another callable has already started the shutdown process. + */ + public Callable createGracefulShutdownCallable() { + if (isShutdownComplete()) { + return () -> true; + } + Callable startShutdown = createWorkerShutdownCallable(); + return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown); + } + + public boolean hasGracefulShutdownStarted() { + return gracefuleShutdownStarted; + } + + @VisibleForTesting + Callable createWorkerShutdownCallable() { + return () -> { + synchronized (this) { + if (this.gracefuleShutdownStarted) { + throw new IllegalStateException("Requested shutdown has already been started"); + } + this.gracefuleShutdownStarted = true; + } + // + // Stop accepting new leases. Once we do this we can be sure that + // no more leases will be acquired. + // + leaseCoordinator.stopLeaseTaker(); + + Collection leases = leaseCoordinator.getAssignments(); + if (leases == null || leases.isEmpty()) { + // + // If there are no leases notification is already completed, but we still need to shutdown the worker. + // + this.shutdown(); + return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED; + } + CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); + CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); + for (KinesisClientLease lease : leases) { + ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, + lease, notificationCompleteLatch, shutdownCompleteLatch); + ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); + ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + if (consumer != null) { + consumer.notifyShutdownRequested(shutdownNotification); + } else { + // + // There is a race condition between retrieving the current assignments, and creating the + // notification. If the a lease is lost in between these two points, we explicitly decrement the + // notification latches to clear the shutdown. + // + notificationCompleteLatch.countDown(); + shutdownCompleteLatch.countDown(); + } + } + return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this); + }; } boolean isShutdownComplete() { @@ -657,8 +698,8 @@ public class Worker implements Runnable { } /** - * Perform final shutdown related tasks for the worker including shutting down worker owned - * executor services, threads, etc. + * Perform final shutdown related tasks for the worker including shutting down worker owned executor services, + * threads, etc. */ private void finalShutdown() { LOG.info("Starting worker's final shutdown."); @@ -699,11 +740,12 @@ public class Worker implements Runnable { } /** - * NOTE: This method is internal/private to the Worker class. It has package - * access solely for testing. + * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * - * @param shardInfo Kinesis shard info - * @param factory RecordProcessor factory + * @param shardInfo + * Kinesis shard info + * @param factory + * RecordProcessor factory * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { @@ -727,15 +769,15 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); + executorService, metricsFactory, taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist); } /** - * Logger for suppressing too much INFO logging. To avoid too much logging - * information Worker will output logging at INFO level for a single pass - * through the main loop every minute. At DEBUG level it will output all - * INFO logs on every pass. + * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at + * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on + * every pass. */ private static class WorkerLog { @@ -791,90 +833,89 @@ public class Worker implements Runnable { // Backwards compatible constructors /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, (AmazonCloudWatch) cloudWatchClient); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient CloudWatch Client for publishing metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - AmazonCloudWatchClient cloudWatchClient, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - (AmazonCloudWatch) cloudWatchClient, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + (AmazonCloudWatch) cloudWatchClient, execService); } /** - * This constructor is for binary compatibility with code compiled against - * version of the KCL that only have constructors taking "Client" objects. + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards - * @param config Kinesis Client Library configuration - * @param kinesisClient Kinesis Client used for fetching data - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory Metrics factory used to emit metrics - * @param execService ExecutorService to use for processing records (support for multi-threaded - * consumption) + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) */ public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, - AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, - IMetricsFactory metricsFactory, - ExecutorService execService) { - this(recordProcessorFactory, - config, - (AmazonKinesis) kinesisClient, - (AmazonDynamoDB) dynamoDBClient, - metricsFactory, - execService); + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + metricsFactory, execService); } /** * Given configuration, returns appropriate metrics factory. - * @param cloudWatchClient Amazon CloudWatch client - * @param config KinesisClientLibConfiguration + * + * @param cloudWatchClient + * Amazon CloudWatch client + * @param config + * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory( - AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { + private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -884,12 +925,8 @@ public class Worker implements Runnable { cloudWatchClient.setRegion(region); LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); } - metricsFactory = new WorkerCWMetricsFactory( - cloudWatchClient, - config.getApplicationName(), - config.getMetricsBufferTimeMillis(), - config.getMetricsMaxQueueSize(), - config.getMetricsLevel(), + metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(), + config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(), config.getMetricsEnabledDimensions()); } return metricsFactory; @@ -897,6 +934,7 @@ public class Worker implements Runnable { /** * Returns default executor service that should be used by the worker. + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -905,26 +943,19 @@ public class Worker implements Runnable { } /** - * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance - * or not. + * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not. * Visible and non-final only for testing. */ static class WorkerCWMetricsFactory extends CWMetricsFactory { - WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, - String namespace, - long bufferTimeMillis, - int maxQueueSize, - MetricsLevel metricsLevel, - Set metricsEnabledDimensions) { - super(cloudWatchClient, namespace, bufferTimeMillis, - maxQueueSize, metricsLevel, metricsEnabledDimensions); + WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, + int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { + super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); } } /** - * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance - * or not. + * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not. * Visible and non-final only for testing. */ static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { @@ -958,24 +989,25 @@ public class Worker implements Runnable { } /** - * Provide a V1 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}. + * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory - recordProcessorFactory) { + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) { this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); return this; } /** - * Provide a V2 - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}. + * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor + * IRecordProcessor}. * - * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards * @return A reference to this updated object so that method calls can be chained together. */ public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { @@ -986,7 +1018,8 @@ public class Worker implements Runnable { /** * Set the Worker config. * - * @param config Kinesis Client Library configuration + * @param config + * Kinesis Client Library configuration * @return A reference to this updated object so that method calls can be chained together. */ public Builder config(KinesisClientLibConfiguration config) { @@ -997,7 +1030,8 @@ public class Worker implements Runnable { /** * Set the Kinesis client. * - * @param kinesisClient Kinesis Client used for fetching data + * @param kinesisClient + * Kinesis Client used for fetching data * @return A reference to this updated object so that method calls can be chained together. */ public Builder kinesisClient(AmazonKinesis kinesisClient) { @@ -1008,7 +1042,8 @@ public class Worker implements Runnable { /** * Set the DynamoDB client. * - * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases * @return A reference to this updated object so that method calls can be chained together. */ public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { @@ -1019,7 +1054,8 @@ public class Worker implements Runnable { /** * Set the Cloudwatch client. * - * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param cloudWatchClient + * CloudWatch Client for publishing metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { @@ -1030,7 +1066,8 @@ public class Worker implements Runnable { /** * Set the metrics factory. * - * @param metricsFactory Metrics factory used to emit metrics + * @param metricsFactory + * Metrics factory used to emit metrics * @return A reference to this updated object so that method calls can be chained together. */ public Builder metricsFactory(IMetricsFactory metricsFactory) { @@ -1041,7 +1078,8 @@ public class Worker implements Runnable { /** * Set the executor service for processing records. * - * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) * @return A reference to this updated object so that method calls can be chained together. */ public Builder execService(ExecutorService execService) { @@ -1075,8 +1113,7 @@ public class Worker implements Runnable { "Kinesis Client Library configuration needs to be provided to build Worker"); } if (recordProcessorFactory == null) { - throw new IllegalArgumentException( - "A Record Processor Factory needs to be provided to build Worker"); + throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } if (execService == null) { @@ -1118,7 +1155,7 @@ public class Worker implements Runnable { + ". Amazon Kinesis endpoint will overwrite region name."); LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint()); - } else { + } else { LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint()); } } @@ -1129,36 +1166,23 @@ public class Worker implements Runnable { shardPrioritization = new ParentsFirstShardPrioritization(1); } - return new Worker(config.getApplicationName(), - recordProcessorFactory, - new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), - kinesisClient).getProxy(config.getStreamName()), - config.getMaxRecords(), - config.getIdleTimeBetweenReadsInMillis(), - config.shouldCallProcessRecordsEvenForEmptyRecordList(), - config.shouldValidateSequenceNumberBeforeCheckpointing(), - config.getInitialPositionInStreamExtended()), - config.getInitialPositionInStreamExtended(), - config.getParentShardPollIntervalMillis(), - config.getShardSyncIntervalMillis(), - config.shouldCleanupLeasesUponShardCompletion(), - null, - new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), - dynamoDBClient), - config.getWorkerIdentifier(), - config.getFailoverTimeMillis(), - config.getEpsilonMillis(), - config.getMaxLeasesForWorker(), - config.getMaxLeasesToStealAtOneTime(), - metricsFactory) - .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) - .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), - execService, - metricsFactory, - config.getTaskBackoffTimeMillis(), - config.getFailoverTimeMillis(), - config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), - shardPrioritization); + return new Worker(config.getApplicationName(), recordProcessorFactory, new StreamConfig( + new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) + .getProxy(config.getStreamName()), + config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), + config.shouldCallProcessRecordsEvenForEmptyRecordList(), + config.shouldValidateSequenceNumberBeforeCheckpointing(), + config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended(), + config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), + config.shouldCleanupLeasesUponShardCompletion(), null, + new KinesisClientLibLeaseCoordinator( + new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), + config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), + config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory) + .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) + .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), + execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java similarity index 92% rename from src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinatorTest.java rename to src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index d56e2bd4..1e89e089 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -1,10 +1,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -18,13 +20,11 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; @RunWith(MockitoJUnitRunner.class) -public class RequestedShutdownCoordinatorTest { +public class GracefulShutdownCoordinatorTest { @Mock private CountDownLatch shutdownCompleteLatch; @@ -33,6 +33,8 @@ public class RequestedShutdownCoordinatorTest { @Mock private Worker worker; @Mock + private Callable contextCallable; + @Mock private ConcurrentMap shardInfoConsumerMap; @Test @@ -262,9 +264,18 @@ public class RequestedShutdownCoordinatorTest { verify(worker).shutdown(); } + @Test(expected = IllegalStateException.class) + public void testWorkerShutdownCallableThrows() throws Exception { + Callable requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); + when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown")); + + requestedShutdownCallable.call(); + } + private void verifyLatchAwait(CountDownLatch latch) throws Exception { verifyLatchAwait(latch, times(1)); } + private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { verifyLatchAwait(latch, times(times)); } @@ -277,9 +288,11 @@ public class RequestedShutdownCoordinatorTest { when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining); } - private Callable buildRequestedShutdownCallable() { - return RequestedShutdownCoordinator.createRequestedShutdownCallable(shutdownCompleteLatch, + private Callable buildRequestedShutdownCallable() throws Exception { + GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, worker); + when(contextCallable.call()).thenReturn(context); + return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable); } private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index daf58165..2d6f918b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -762,7 +762,7 @@ public class WorkerTest { verify(executorService, atLeastOnce()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -781,6 +781,146 @@ public class WorkerTest { } + @Test(expected = IllegalStateException.class) + public void testShutdownCallableNotAllowedTwice() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefuleShutdownStarted = true; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + assertThat(worker.hasGracefulShutdownStarted(), equalTo(true)); + worker.createWorkerShutdownCallable().call(); + + } + + @Test + public void testGracefulShutdownSingleFuture() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class); + when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true); + + Future gracefulShutdownFuture = mock(Future.class); + + when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); + + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + @Override + void postConstruct() { + this.gracefulShutdownCoordinator = coordinator; + } + }; + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + Future firstFuture = worker.startGracefulShutdown(); + Future secondFuture = worker.startGracefulShutdown(); + + assertThat(firstFuture, equalTo(secondFuture)); + verify(coordinator).startGracefulShutdown(any(Callable.class)); + + } + @Test public void testRequestShutdownNoLeases() throws Exception { @@ -830,7 +970,7 @@ public class WorkerTest { verify(executorService, never()).submit(argThat( both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); worker.runProcessLoop(); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) @@ -909,7 +1049,7 @@ public class WorkerTest { .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2))))); worker.getShardInfoShardConsumerMap().remove(shardInfo2); - worker.requestShutdown(); + worker.createWorkerShutdownCallable().call(); leases.remove(1); currentAssignments.remove(1); worker.runProcessLoop(); @@ -1194,6 +1334,24 @@ public class WorkerTest { } + private abstract class InjectableWorker extends Worker { + InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, + boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, + checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, + failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + postConstruct(); + } + + abstract void postConstruct(); + } + private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) { return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID()) .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)