Reworked some of the shutdown logic to make the relationships clearer.

This commit is contained in:
Pfifer, Justin 2017-06-13 06:21:27 -07:00
parent 9686d7c3ca
commit 0829378957
5 changed files with 624 additions and 410 deletions

View file

@ -0,0 +1,19 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Data;
import java.util.concurrent.CountDownLatch;
@Data
class GracefulShutdownContext {
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
static GracefulShutdownContext SHUTDOWN_ALREADY_COMPLETED = new GracefulShutdownContext(null, null, null);
boolean isShutdownAlreadyCompleted() {
return shutdownCompleteLatch == null && notificationCompleteLatch == null && worker == null;
}
}

View file

@ -1,7 +1,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -9,9 +8,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
class RequestedShutdownCoordinator { class GracefulShutdownCoordinator {
static Future<Boolean> startRequestedShutdown(Callable<Boolean> shutdownCallable) { Future<Boolean> startGracefulShutdown(Callable<Boolean> shutdownCallable) {
FutureTask<Boolean> task = new FutureTask<>(shutdownCallable); FutureTask<Boolean> task = new FutureTask<>(shutdownCallable);
Thread shutdownThread = new Thread(task, "RequestedShutdownThread"); Thread shutdownThread = new Thread(task, "RequestedShutdownThread");
shutdownThread.start(); shutdownThread.start();
@ -19,45 +18,39 @@ class RequestedShutdownCoordinator {
} }
static Callable<Boolean> createRequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, Callable<Boolean> createGracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
CountDownLatch notificationCompleteLatch, Worker worker) { return new GracefulShutdownCallable(startWorkerShutdown);
return new RequestedShutdownCallable(shutdownCompleteLatch, notificationCompleteLatch, worker);
} }
static class RequestedShutdownCallable implements Callable<Boolean> { static class GracefulShutdownCallable implements Callable<Boolean> {
private static final Log log = LogFactory.getLog(RequestedShutdownCallable.class); private static final Log log = LogFactory.getLog(GracefulShutdownCallable.class);
private final CountDownLatch shutdownCompleteLatch; private final Callable<GracefulShutdownContext> startWorkerShutdown;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, GracefulShutdownCallable(Callable<GracefulShutdownContext> startWorkerShutdown) {
Worker worker) { this.startWorkerShutdown = startWorkerShutdown;
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
} }
private boolean isWorkerShutdownComplete() { private boolean isWorkerShutdownComplete(GracefulShutdownContext context) {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); return context.getWorker().isShutdownComplete() || context.getWorker().getShardInfoShardConsumerMap().isEmpty();
} }
private String awaitingLogMessage() { private String awaitingLogMessage(GracefulShutdownContext context) {
long awaitingNotification = notificationCompleteLatch.getCount(); long awaitingNotification = context.getNotificationCompleteLatch().getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); long awaitingFinalShutdown = context.getShutdownCompleteLatch().getCount();
return String.format( return String.format(
"Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ", "Waiting for %d record process to complete shutdown notification, and %d record processor to complete final shutdown ",
awaitingNotification, awaitingFinalShutdown); awaitingNotification, awaitingFinalShutdown);
} }
private String awaitingFinalShutdownMessage() { private String awaitingFinalShutdownMessage(GracefulShutdownContext context) {
long outstanding = shutdownCompleteLatch.getCount(); long outstanding = context.getShutdownCompleteLatch().getCount();
return String.format("Waiting for %d record processors to complete final shutdown", outstanding); return String.format("Waiting for %d record processors to complete final shutdown", outstanding);
} }
private boolean waitForRecordProcessors() { private boolean waitForRecordProcessors(GracefulShutdownContext context) {
// //
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
@ -66,18 +59,18 @@ class RequestedShutdownCoordinator {
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods. // ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
// //
try { try {
while (!notificationCompleteLatch.await(1, TimeUnit.SECONDS)) { while (!context.getNotificationCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedException(); throw new InterruptedException();
} }
log.info(awaitingLogMessage()); log.info(awaitingLogMessage(context));
if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) { if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
return false; return false;
} }
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
log.warn("Interrupted while waiting for notification complete, terminating shutdown. " log.warn("Interrupted while waiting for notification complete, terminating shutdown. "
+ awaitingLogMessage()); + awaitingLogMessage(context));
return false; return false;
} }
@ -90,7 +83,7 @@ class RequestedShutdownCoordinator {
// Once all record processors have been notified of the shutdown it is safe to allow the worker to // Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
// //
worker.shutdown(); context.getWorker().shutdown();
if (Thread.interrupted()) { if (Thread.interrupted()) {
log.warn("Interrupted after worker shutdown, terminating shutdown"); log.warn("Interrupted after worker shutdown, terminating shutdown");
@ -103,18 +96,18 @@ class RequestedShutdownCoordinator {
// ShardConsumer is terminated. // ShardConsumer is terminated.
// //
try { try {
while (!shutdownCompleteLatch.await(1, TimeUnit.SECONDS)) { while (!context.getShutdownCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedException(); throw new InterruptedException();
} }
log.info(awaitingFinalShutdownMessage()); log.info(awaitingFinalShutdownMessage(context));
if (workerShutdownWithRemaining(shutdownCompleteLatch.getCount())) { if (workerShutdownWithRemaining(context.getShutdownCompleteLatch().getCount(), context)) {
return false; return false;
} }
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. " log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. "
+ awaitingFinalShutdownMessage()); + awaitingFinalShutdownMessage(context));
return false; return false;
} }
return true; return true;
@ -128,13 +121,13 @@ class RequestedShutdownCoordinator {
* @param outstanding * @param outstanding
* the number of record processor still awaiting shutdown. * the number of record processor still awaiting shutdown.
*/ */
private boolean workerShutdownWithRemaining(long outstanding) { private boolean workerShutdownWithRemaining(long outstanding, GracefulShutdownContext context) {
if (isWorkerShutdownComplete()) { if (isWorkerShutdownComplete(context)) {
if (outstanding != 0) { if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " + " with a current value of " + context.getShutdownCompleteLatch().getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: " + context.getWorker().isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size()); + context.getWorker().getShardInfoShardConsumerMap().size());
return true; return true;
} }
} }
@ -143,7 +136,14 @@ class RequestedShutdownCoordinator {
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
return waitForRecordProcessors(); GracefulShutdownContext context;
try {
context = startWorkerShutdown.call();
} catch (Exception ex) {
log.warn("Caught exception while requesting initial worker shutdown.", ex);
throw ex;
}
return context.isShutdownAlreadyCompleted() || waitForRecordProcessors(context);
} }
} }
} }

View file

@ -1,16 +1,9 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software
* * License (the "License"). You may not use this file except in compliance with the License. A copy of the License is
* Licensed under the Amazon Software License (the "License"). * located at http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an
* You may not use this file except in compliance with the License. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* A copy of the License is located at * specific language governing permissions and limitations under the License.
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -24,7 +17,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -56,14 +48,12 @@ import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Worker is the high level class that Kinesis applications use to start * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
* processing data. It initializes and oversees different components (e.g. * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
* syncing shard and lease information, tracking shard assignments, and * the shards).
* processing data from the shards).
*/ */
public class Worker implements Runnable { public class Worker implements Runnable {
@ -100,17 +90,27 @@ public class Worker implements Runnable {
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
/**
* Used to ensure that only one requestedShutdown is in progress at a time.
*/
private Future<Boolean> gracefulShutdownFuture;
@VisibleForTesting
protected boolean gracefuleShutdownStarted = false;
@VisibleForTesting
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
@ -121,108 +121,119 @@ public class Worker implements Runnable {
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param execService ExecutorService to use for processing records (support for multi-threaded * @param config
* consumption) * Kinesis Client Library configuration
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, ExecutorService execService) {
ExecutorService execService) { this(recordProcessorFactory, config,
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(), new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()), config.getDynamoDBClientConfiguration()),
new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(), new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration()), execService); config.getCloudWatchClientConfiguration()),
execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param metricsFactory Metrics factory used to emit metrics * @param config
* Kinesis Client Library configuration
* @param metricsFactory
* Metrics factory used to emit metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
IMetricsFactory metricsFactory) {
this(recordProcessorFactory, config, metricsFactory, getExecutorService()); this(recordProcessorFactory, config, metricsFactory, getExecutorService());
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param metricsFactory Metrics factory used to emit metrics * @param config
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client Library configuration
* consumption) * @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
IMetricsFactory metricsFactory, this(recordProcessorFactory, config,
ExecutorService execService) { new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()), metricsFactory, execService); config.getDynamoDBClientConfiguration()),
metricsFactory, execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient,
AmazonDynamoDB dynamoDBClient,
AmazonCloudWatch cloudWatchClient) { AmazonCloudWatch cloudWatchClient) {
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService()); this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient, getExecutorService());
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient, AmazonCloudWatch cloudWatchClient, ExecutorService execService) {
AmazonDynamoDB dynamoDBClient, this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, getMetricsFactory(cloudWatchClient, config),
AmazonCloudWatch cloudWatchClient, execService);
ExecutorService execService) {
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient,
getMetricsFactory(cloudWatchClient, config), execService);
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
AmazonKinesis kinesisClient, IMetricsFactory metricsFactory, ExecutorService execService) {
AmazonDynamoDB dynamoDBClient, this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
IMetricsFactory metricsFactory,
ExecutorService execService) {
this(
config.getApplicationName(),
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
new StreamConfig( new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()), .getProxy(config.getStreamName()),
@ -230,27 +241,16 @@ public class Worker implements Runnable {
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended()),
config.getInitialPositionInStreamExtended(), config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(),
config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
config.getShardSyncIntervalMillis(),
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator( new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
config.getWorkerIdentifier(), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
config.getFailoverTimeMillis(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory)
config.getEpsilonMillis(),
config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService, execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(),
metricsFactory, config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy());
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
@ -280,41 +280,43 @@ public class Worker implements Runnable {
} }
/** /**
* @param applicationName Name of the Kinesis application * @param applicationName
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * Name of the Kinesis application
* @param streamConfig Stream configuration * @param recordProcessorFactory
* @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start * Used to get record processor instances for processing data from shards
* fetching data from this location in the stream when an application starts up for the first time and * @param streamConfig
* there are no checkpoints. If there are checkpoints, we start from the checkpoint position. * Stream configuration
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done * @param initialPositionInStream
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in * this location in the stream when an application starts up for the first time and there are no
* Kinesis) * checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param checkpoint Used to get/set checkpoints * @param parentShardPollIntervalMillis
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases) * Wait for this long between polls to check if parent shards are done
* @param execService ExecutorService to use for processing records (support for multi-threaded * @param shardSyncIdleTimeMillis
* consumption) * Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory used to emit metrics * @param cleanupLeasesUponShardCompletion
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception * Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param shardPrioritization Provides prioritization logic to decide which available shards process first * @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
* Lease coordinator (coordinates currently owned leases)
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory
* Metrics factory used to emit metrics
* @param taskBackoffTimeMillis
* Backoff period when tasks encounter an exception
* @param shardPrioritization
* Provides prioritization logic to decide which available shards process first
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
IRecordProcessorFactory recordProcessorFactory, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
StreamConfig streamConfig, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
InitialPositionInStreamExtended initialPositionInStream, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
long parentShardPollIntervalMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
long shardSyncIdleTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
boolean cleanupLeasesUponShardCompletion,
ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator,
ExecutorService execService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardPrioritization shardPrioritization) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
@ -326,13 +328,8 @@ public class Worker implements Runnable {
this.executorService = execService; this.executorService = execService;
this.leaseCoordinator = leaseCoordinator; this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.controlServer = this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
new ShardSyncTaskManager(streamConfig.getStreamProxy(), initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
leaseCoordinator.getLeaseManager(),
initialPositionInStream,
cleanupLeasesUponShardCompletion,
shardSyncIdleTimeMillis,
metricsFactory,
executorService); executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis; this.failoverTimeMillis = failoverTimeMillis;
@ -348,8 +345,7 @@ public class Worker implements Runnable {
} }
/** /**
* Start consuming data from the stream, and pass it to the application * Start consuming data from the stream, and pass it to the application record processors.
* record processors.
*/ */
public void run() { public void run() {
if (shutdown) { if (shutdown) {
@ -422,12 +418,8 @@ public class Worker implements Runnable {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist if (!skipShardSyncAtWorkerInitializationIfLeasesExist
|| leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
LOG.info("Syncing Kinesis shard info"); LOG.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = ShardSyncTask shardSyncTask = new ShardSyncTask(streamConfig.getStreamProxy(),
new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), initialPosition, cleanupLeasesUponShardCompletion, 0L);
leaseCoordinator.getLeaseManager(),
initialPosition,
cleanupLeasesUponShardCompletion,
0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else { } else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
@ -464,14 +456,13 @@ public class Worker implements Runnable {
} }
/** /**
* NOTE: This method is internal/private to the Worker class. It has package * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* access solely for testing.
* *
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); * shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); ShardInfo
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); * shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/ */
void cleanupShardConsumers(Set<ShardInfo> assignedShards) { void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
@ -511,38 +502,17 @@ public class Worker implements Runnable {
} }
/** /**
* Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, * Starts the requestedShutdown process, and returns a future that can be used to track the process.
* of the impending shutdown. This gives the record processor a final chance to checkpoint.
* *
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
* lost after requesting shutdown, but before the notification is dispatched.</b> * indicates the process behavior
* *
* <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to * @return a future that will be set once shutdown is completed.
* allow the record processors a chance to checkpoint a final time.
* <ol>
* <li>Call to request shutdown invoked.</li>
* <li>Worker stops attempting to acquire new leases</li>
* <li>Record Processor Shutdown Begins
* <ol>
* <li>Record processor is notified of the impending shutdown, and given a final chance to checkpoint</li>
* <li>The lease for the record processor is then dropped.</li>
* <li>The record processor enters into an idle state waiting for the worker to complete final termination</li>
* <li>The worker will detect a record processor that has lost it's lease, and will terminate the record processor
* with {@link ShutdownReason#ZOMBIE}</li>
* </ol>
* </li>
* <li>The worker will shutdown all record processors.</li>
* <li>Once all record processors have been terminated, the worker will terminate all owned resources.</li>
* <li>Once the worker shutdown is complete, the returned future is completed.</li>
* </ol>
*
*
*
* @return a Future that will be set once the shutdown is complete.
*/ */
@Deprecated
public Future<Void> requestShutdown() { public Future<Void> requestShutdown() {
Future<Boolean> requestedShutdownFuture = requestCancellableShutdown(); Future<Boolean> requestedShutdownFuture = startGracefulShutdown();
return new Future<Void>() { return new Future<Void>() {
@ -568,18 +538,88 @@ public class Worker implements Runnable {
} }
@Override @Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
requestedShutdownFuture.get(timeout, unit); requestedShutdownFuture.get(timeout, unit);
return null; return null;
} }
}; };
} }
public Future<Boolean> requestCancellableShutdown() { /**
return RequestedShutdownCoordinator.startRequestedShutdown(requestShutdownCallable()); * Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
*
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
* lost after requesting shutdown, but before the notification is dispatched.</b>
*
* <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
* allow the record processors a chance to checkpoint a final time.
* <ol>
* <li>Call to request shutdown invoked.</li>
* <li>Worker stops attempting to acquire new leases</li>
* <li>Record Processor Shutdown Begins
* <ol>
* <li>Record processor is notified of the impending shutdown, and given a final chance to checkpoint</li>
* <li>The lease for the record processor is then dropped.</li>
* <li>The record processor enters into an idle state waiting for the worker to complete final termination</li>
* <li>The worker will detect a record processor that has lost it's lease, and will terminate the record processor
* with {@link ShutdownReason#ZOMBIE}</li>
* </ol>
* </li>
* <li>The worker will shutdown all record processors.</li>
* <li>Once all record processors have been terminated, the worker will terminate all owned resources.</li>
* <li>Once the worker shutdown is complete, the returned future is completed.</li>
* </ol>
*
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* terminate early.
*/
public Future<Boolean> startGracefulShutdown() {
synchronized (this) {
if (gracefulShutdownFuture == null) {
gracefulShutdownFuture = gracefulShutdownCoordinator
.startGracefulShutdown(createGracefulShutdownCallable());
}
}
return gracefulShutdownFuture;
} }
public Callable<Boolean> requestShutdownCallable() { /**
* Creates a callable that will execute the graceful shutdown process. This callable can be used to execute graceful
* shutdowns in your own executor, or execute the shutdown synchronously.
*
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the
* graceful shutdown has already been completed.
* @throws IllegalStateException
* thrown by the callable if another callable has already started the shutdown process.
*/
public Callable<Boolean> createGracefulShutdownCallable() {
if (isShutdownComplete()) {
return () -> true;
}
Callable<GracefulShutdownContext> startShutdown = createWorkerShutdownCallable();
return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
}
public boolean hasGracefulShutdownStarted() {
return gracefuleShutdownStarted;
}
@VisibleForTesting
Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
return () -> {
synchronized (this) {
if (this.gracefuleShutdownStarted) {
throw new IllegalStateException("Requested shutdown has already been started");
}
this.gracefuleShutdownStarted = true;
}
// //
// Stop accepting new leases. Once we do this we can be sure that // Stop accepting new leases. Once we do this we can be sure that
// no more leases will be acquired. // no more leases will be acquired.
@ -592,13 +632,13 @@ public class Worker implements Runnable {
// If there are no leases notification is already completed, but we still need to shutdown the worker. // If there are no leases notification is already completed, but we still need to shutdown the worker.
// //
this.shutdown(); this.shutdown();
return () -> true; return GracefulShutdownContext.SHUTDOWN_ALREADY_COMPLETED;
} }
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
for (KinesisClientLease lease : leases) { for (KinesisClientLease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
notificationCompleteLatch, shutdownCompleteLatch); lease, notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) { if (consumer != null) {
@ -613,7 +653,8 @@ public class Worker implements Runnable {
shutdownCompleteLatch.countDown(); shutdownCompleteLatch.countDown();
} }
} }
return RequestedShutdownCoordinator.createRequestedShutdownCallable(shutdownCompleteLatch, notificationCompleteLatch, this); return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);
};
} }
boolean isShutdownComplete() { boolean isShutdownComplete() {
@ -657,8 +698,8 @@ public class Worker implements Runnable {
} }
/** /**
* Perform final shutdown related tasks for the worker including shutting down worker owned * Perform final shutdown related tasks for the worker including shutting down worker owned executor services,
* executor services, threads, etc. * threads, etc.
*/ */
private void finalShutdown() { private void finalShutdown() {
LOG.info("Starting worker's final shutdown."); LOG.info("Starting worker's final shutdown.");
@ -699,11 +740,12 @@ public class Worker implements Runnable {
} }
/** /**
* NOTE: This method is internal/private to the Worker class. It has package * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* access solely for testing.
* *
* @param shardInfo Kinesis shard info * @param shardInfo
* @param factory RecordProcessor factory * Kinesis shard info
* @param factory
* RecordProcessor factory
* @return ShardConsumer for the shard * @return ShardConsumer for the shard
*/ */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
@ -727,15 +769,15 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); executorService, metricsFactory, taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist);
} }
/** /**
* Logger for suppressing too much INFO logging. To avoid too much logging * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* information Worker will output logging at INFO level for a single pass * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
* through the main loop every minute. At DEBUG level it will output all * every pass.
* INFO logs on every pass.
*/ */
private static class WorkerLog { private static class WorkerLog {
@ -791,90 +833,89 @@ public class Worker implements Runnable {
// Backwards compatible constructors // Backwards compatible constructors
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient); (AmazonCloudWatch) cloudWatchClient);
} }
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient, (AmazonCloudWatch) cloudWatchClient, execService);
ExecutorService execService) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient,
execService);
} }
/** /**
* This constructor is for binary compatibility with code compiled against * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* version of the KCL that only have constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* @param config Kinesis Client Library configuration * Used to get record processor instances for processing data from shards
* @param kinesisClient Kinesis Client used for fetching data * @param config
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics * @param kinesisClient
* @param execService ExecutorService to use for processing records (support for multi-threaded * Kinesis Client used for fetching data
* consumption) * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
public Worker( public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonKinesisClient kinesisClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
AmazonDynamoDBClient dynamoDBClient, this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
IMetricsFactory metricsFactory, metricsFactory, execService);
ExecutorService execService) {
this(recordProcessorFactory,
config,
(AmazonKinesis) kinesisClient,
(AmazonDynamoDB) dynamoDBClient,
metricsFactory,
execService);
} }
/** /**
* Given configuration, returns appropriate metrics factory. * Given configuration, returns appropriate metrics factory.
* @param cloudWatchClient Amazon CloudWatch client *
* @param config KinesisClientLibConfiguration * @param cloudWatchClient
* Amazon CloudWatch client
* @param config
* KinesisClientLibConfiguration
* @return Returns metrics factory based on the config. * @return Returns metrics factory based on the config.
*/ */
private static IMetricsFactory getMetricsFactory( private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
AmazonCloudWatch cloudWatchClient, KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
if (config.getMetricsLevel() == MetricsLevel.NONE) { if (config.getMetricsLevel() == MetricsLevel.NONE) {
metricsFactory = new NullMetricsFactory(); metricsFactory = new NullMetricsFactory();
@ -884,12 +925,8 @@ public class Worker implements Runnable {
cloudWatchClient.setRegion(region); cloudWatchClient.setRegion(region);
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName()); LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
} }
metricsFactory = new WorkerCWMetricsFactory( metricsFactory = new WorkerCWMetricsFactory(cloudWatchClient, config.getApplicationName(),
cloudWatchClient, config.getMetricsBufferTimeMillis(), config.getMetricsMaxQueueSize(), config.getMetricsLevel(),
config.getApplicationName(),
config.getMetricsBufferTimeMillis(),
config.getMetricsMaxQueueSize(),
config.getMetricsLevel(),
config.getMetricsEnabledDimensions()); config.getMetricsEnabledDimensions());
} }
return metricsFactory; return metricsFactory;
@ -897,6 +934,7 @@ public class Worker implements Runnable {
/** /**
* Returns default executor service that should be used by the worker. * Returns default executor service that should be used by the worker.
*
* @return Default executor service that should be used by the worker. * @return Default executor service that should be used by the worker.
*/ */
private static ExecutorService getExecutorService() { private static ExecutorService getExecutorService() {
@ -905,26 +943,19 @@ public class Worker implements Runnable {
} }
/** /**
* Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance * Extension to CWMetricsFactory, so worker can identify whether it owns the metrics factory instance or not.
* or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
*/ */
static class WorkerCWMetricsFactory extends CWMetricsFactory { static class WorkerCWMetricsFactory extends CWMetricsFactory {
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis,
String namespace, int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
long bufferTimeMillis, super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
int maxQueueSize,
MetricsLevel metricsLevel,
Set<String> metricsEnabledDimensions) {
super(cloudWatchClient, namespace, bufferTimeMillis,
maxQueueSize, metricsLevel, metricsEnabledDimensions);
} }
} }
/** /**
* Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance * Extension to ThreadPoolExecutor, so worker can identify whether it owns the executor service instance or not.
* or not.
* Visible and non-final only for testing. * Visible and non-final only for testing.
*/ */
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
@ -958,24 +989,25 @@ public class Worker implements Runnable {
} }
/** /**
* Provide a V1 * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory( public Builder recordProcessorFactory(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory) {
recordProcessorFactory) {
this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory); this.recordProcessorFactory = new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory);
return this; return this;
} }
/** /**
* Provide a V2 * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
@ -986,7 +1018,8 @@ public class Worker implements Runnable {
/** /**
* Set the Worker config. * Set the Worker config.
* *
* @param config Kinesis Client Library configuration * @param config
* Kinesis Client Library configuration
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder config(KinesisClientLibConfiguration config) { public Builder config(KinesisClientLibConfiguration config) {
@ -997,7 +1030,8 @@ public class Worker implements Runnable {
/** /**
* Set the Kinesis client. * Set the Kinesis client.
* *
* @param kinesisClient Kinesis Client used for fetching data * @param kinesisClient
* Kinesis Client used for fetching data
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder kinesisClient(AmazonKinesis kinesisClient) { public Builder kinesisClient(AmazonKinesis kinesisClient) {
@ -1008,7 +1042,8 @@ public class Worker implements Runnable {
/** /**
* Set the DynamoDB client. * Set the DynamoDB client.
* *
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases * @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
@ -1019,7 +1054,8 @@ public class Worker implements Runnable {
/** /**
* Set the Cloudwatch client. * Set the Cloudwatch client.
* *
* @param cloudWatchClient CloudWatch Client for publishing metrics * @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
@ -1030,7 +1066,8 @@ public class Worker implements Runnable {
/** /**
* Set the metrics factory. * Set the metrics factory.
* *
* @param metricsFactory Metrics factory used to emit metrics * @param metricsFactory
* Metrics factory used to emit metrics
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder metricsFactory(IMetricsFactory metricsFactory) { public Builder metricsFactory(IMetricsFactory metricsFactory) {
@ -1041,7 +1078,8 @@ public class Worker implements Runnable {
/** /**
* Set the executor service for processing records. * Set the executor service for processing records.
* *
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder execService(ExecutorService execService) { public Builder execService(ExecutorService execService) {
@ -1075,8 +1113,7 @@ public class Worker implements Runnable {
"Kinesis Client Library configuration needs to be provided to build Worker"); "Kinesis Client Library configuration needs to be provided to build Worker");
} }
if (recordProcessorFactory == null) { if (recordProcessorFactory == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
"A Record Processor Factory needs to be provided to build Worker");
} }
if (execService == null) { if (execService == null) {
@ -1129,36 +1166,23 @@ public class Worker implements Runnable {
shardPrioritization = new ParentsFirstShardPrioritization(1); shardPrioritization = new ParentsFirstShardPrioritization(1);
} }
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(), recordProcessorFactory, new StreamConfig(
recordProcessorFactory, new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), .getProxy(config.getStreamName()),
kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.shouldValidateSequenceNumberBeforeCheckpointing(),
config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended()), config.getInitialPositionInStreamExtended(),
config.getInitialPositionInStreamExtended(), config.getParentShardPollIntervalMillis(), config.getShardSyncIntervalMillis(),
config.getParentShardPollIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null,
config.getShardSyncIntervalMillis(), new KinesisClientLibLeaseCoordinator(
config.shouldCleanupLeasesUponShardCompletion(), new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient),
null, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), config.getMaxLeasesForWorker(), config.getMaxLeasesToStealAtOneTime(), metricsFactory)
dynamoDBClient),
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
config.getMaxLeasesForWorker(),
config.getMaxLeasesToStealAtOneTime(),
metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService, execService, metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(),
metricsFactory, config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization);
config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization);
} }

View file

@ -1,10 +1,12 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -18,13 +20,11 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode; import org.mockito.verification.VerificationMode;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class RequestedShutdownCoordinatorTest { public class GracefulShutdownCoordinatorTest {
@Mock @Mock
private CountDownLatch shutdownCompleteLatch; private CountDownLatch shutdownCompleteLatch;
@ -33,6 +33,8 @@ public class RequestedShutdownCoordinatorTest {
@Mock @Mock
private Worker worker; private Worker worker;
@Mock @Mock
private Callable<GracefulShutdownContext> contextCallable;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap; private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Test @Test
@ -262,9 +264,18 @@ public class RequestedShutdownCoordinatorTest {
verify(worker).shutdown(); verify(worker).shutdown();
} }
@Test(expected = IllegalStateException.class)
public void testWorkerShutdownCallableThrows() throws Exception {
Callable<Boolean> requestedShutdownCallable = new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
when(contextCallable.call()).thenThrow(new IllegalStateException("Bad Shutdown"));
requestedShutdownCallable.call();
}
private void verifyLatchAwait(CountDownLatch latch) throws Exception { private void verifyLatchAwait(CountDownLatch latch) throws Exception {
verifyLatchAwait(latch, times(1)); verifyLatchAwait(latch, times(1));
} }
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
verifyLatchAwait(latch, times(times)); verifyLatchAwait(latch, times(times));
} }
@ -277,9 +288,11 @@ public class RequestedShutdownCoordinatorTest {
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining); when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, remaining);
} }
private Callable<Boolean> buildRequestedShutdownCallable() { private Callable<Boolean> buildRequestedShutdownCallable() throws Exception {
return RequestedShutdownCoordinator.createRequestedShutdownCallable(shutdownCompleteLatch, GracefulShutdownContext context = new GracefulShutdownContext(shutdownCompleteLatch,
notificationCompleteLatch, worker); notificationCompleteLatch, worker);
when(contextCallable.call()).thenReturn(context);
return new GracefulShutdownCoordinator().createGracefulShutdownCallable(contextCallable);
} }
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) { private void mockShardInfoConsumerMap(Integer initialItemCount, Integer... additionalItemCounts) {

View file

@ -762,7 +762,7 @@ public class WorkerTest {
verify(executorService, atLeastOnce()).submit(argThat( verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
worker.runProcessLoop(); worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
@ -781,6 +781,146 @@ public class WorkerTest {
} }
@Test(expected = IllegalStateException.class)
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefuleShutdownStarted = true;
}
};
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
assertThat(worker.hasGracefulShutdownStarted(), equalTo(true));
worker.createWorkerShutdownCallable().call();
}
@Test
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
GracefulShutdownCoordinator coordinator = mock(GracefulShutdownCoordinator.class);
when(coordinator.createGracefulShutdownCallable(any(Callable.class))).thenReturn(() -> true);
Future<Boolean> gracefulShutdownFuture = mock(Future.class);
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@Override
void postConstruct() {
this.gracefulShutdownCoordinator = coordinator;
}
};
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
Future<Boolean> firstFuture = worker.startGracefulShutdown();
Future<Boolean> secondFuture = worker.startGracefulShutdown();
assertThat(firstFuture, equalTo(secondFuture));
verify(coordinator).startGracefulShutdown(any(Callable.class));
}
@Test @Test
public void testRequestShutdownNoLeases() throws Exception { public void testRequestShutdownNoLeases() throws Exception {
@ -830,7 +970,7 @@ public class WorkerTest {
verify(executorService, never()).submit(argThat( verify(executorService, never()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
worker.runProcessLoop(); worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
@ -909,7 +1049,7 @@ public class WorkerTest {
.withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2))))); .withField(InitializeTask.class, "shardInfo", equalTo(shardInfo2)))));
worker.getShardInfoShardConsumerMap().remove(shardInfo2); worker.getShardInfoShardConsumerMap().remove(shardInfo2);
worker.requestShutdown(); worker.createWorkerShutdownCallable().call();
leases.remove(1); leases.remove(1);
currentAssignments.remove(1); currentAssignments.remove(1);
worker.runProcessLoop(); worker.runProcessLoop();
@ -1194,6 +1334,24 @@ public class WorkerTest {
} }
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
postConstruct();
}
abstract void postConstruct();
}
private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) { private KinesisClientLease makeLease(ExtendedSequenceNumber checkpoint, int shardId) {
return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID()) return new KinesisClientLeaseBuilder().withCheckpoint(checkpoint).withConcurrencyToken(UUID.randomUUID())
.withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L) .withLastCounterIncrementNanos(0L).withLeaseCounter(0L).withOwnerSwitchesSinceCheckpoint(0L)