From 26e67a33b1f653be5f150fe450514d27d2a54be7 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Wed, 26 Oct 2016 12:57:50 -0700 Subject: [PATCH] Allow for a Graceful Shutdown of the Worker (#109) Add a new method to the worker requestShutdown that allows the worker to gracefully shutdown all record processors. The graceful shutdown gives the record processors a last chance to checkpoint before they're terminated. To use these new features the record processor must implement IShutdownnotificationaware. --- .../interfaces/IRecordProcessor.java | 2 +- .../v2/IShutdownNotificationAware.java | 19 + .../lib/worker/ConsumerStates.java | 602 ++++++++++++++++++ .../KinesisClientLibLeaseCoordinator.java | 35 +- .../MetricsCollectingTaskDecorator.java | 8 + .../worker/RecordProcessorCheckpointer.java | 2 +- .../lib/worker/ShardConsumer.java | 288 ++++----- .../ShardConsumerShutdownNotification.java | 71 +++ .../lib/worker/ShutdownFuture.java | 155 +++++ .../lib/worker/ShutdownNotification.java | 22 + .../lib/worker/ShutdownNotificationTask.java | 43 ++ .../{types => lib/worker}/ShutdownReason.java | 44 +- .../lib/worker/ShutdownTask.java | 7 +- .../clientlibrary/lib/worker/TaskType.java | 12 +- .../clientlibrary/lib/worker/Worker.java | 104 ++- .../clientlibrary/types/ShutdownInput.java | 1 + .../leases/impl/KinesisClientLease.java | 12 +- .../services/kinesis/leases/impl/Lease.java | 16 +- .../kinesis/leases/impl/LeaseCoordinator.java | 33 +- .../kinesis/leases/impl/LeaseRenewer.java | 9 + .../leases/interfaces/ILeaseRenewer.java | 7 + .../kinesis/multilang/MessageWriter.java | 2 +- .../kinesis/multilang/MultiLangProtocol.java | 2 +- .../multilang/MultiLangRecordProcessor.java | 2 +- .../multilang/messages/ShutdownMessage.java | 2 +- .../lib/worker/ConsumerStatesTest.java | 385 +++++++++++ .../lib/worker/ShardConsumerTest.java | 110 ++-- .../lib/worker/ShardSequenceVerifier.java | 1 - .../lib/worker/ShutdownFutureTest.java | 236 +++++++ .../lib/worker/ShutdownTaskTest.java | 7 +- .../lib/worker/TestStreamlet.java | 34 +- .../clientlibrary/lib/worker/WorkerTest.java | 379 ++++++++++- .../types/ShutdownReasonTest.java | 26 +- .../impl/KinesisClientLeaseBuilder.java | 63 ++ .../kinesis/multilang/MessageWriterTest.java | 2 +- .../multilang/MultiLangProtocolTest.java | 2 +- .../StreamingRecordProcessorTest.java | 2 +- .../multilang/messages/MessageTest.java | 3 +- 38 files changed, 2479 insertions(+), 271 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java rename src/main/java/com/amazonaws/services/kinesis/clientlibrary/{types => lib/worker}/ShutdownReason.java (52%) create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java index 481a5dbf..89cf092a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces; import java.util.List; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java new file mode 100644 index 00000000..82a18a0e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; + +/** + * Allows a record processor to indicate it's aware of requested shutdowns, and handle the request. + */ +public interface IShutdownNotificationAware { + + /** + * Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint. + * + * The record processor will still have shutdown called. + * + * @param checkpointer the checkpointer that can be used to save progress. + */ + void shutdownRequested(IRecordProcessorCheckpointer checkpointer); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java new file mode 100644 index 00000000..c8678974 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -0,0 +1,602 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, + * and state transitions is contained within the {@link ConsumerState} objects. + * + *

State Diagram

+ * + *
+ *       +-------------------+
+ *       | Waiting on Parent |                               +------------------+
+ *  +----+       Shard       |                               |     Shutdown     |
+ *  |    |                   |          +--------------------+   Notification   |
+ *  |    +----------+--------+          |  Shutdown:         |     Requested    |
+ *  |               | Success           |   Requested        +-+-------+--------+
+ *  |               |                   |                      |       |
+ *  |        +------+-------------+     |                      |       | Shutdown:
+ *  |        |    Initializing    +-----+                      |       |  Requested
+ *  |        |                    |     |                      |       |
+ *  |        |                    +-----+-------+              |       |
+ *  |        +---------+----------+     |       | Shutdown:    | +-----+-------------+
+ *  |                  | Success        |       |  Terminated  | |     Shutdown      |
+ *  |                  |                |       |  Zombie      | |   Notification    +-------------+
+ *  |           +------+-------------+  |       |              | |     Complete      |             |
+ *  |           |     Processing     +--+       |              | ++-----------+------+             |
+ *  |       +---+                    |          |              |  |           |                    |
+ *  |       |   |                    +----------+              |  |           | Shutdown:          |
+ *  |       |   +------+-------------+          |              \  /           |  Requested         |
+ *  |       |          |                        |               \/            +--------------------+
+ *  |       |          |                        |               ||
+ *  |       | Success  |                        |               || Shutdown:
+ *  |       +----------+                        |               ||  Terminated
+ *  |                                           |               ||  Zombie
+ *  |                                           |               ||
+ *  |                                           |               ||
+ *  |                                           |           +---++--------------+
+ *  |                                           |           |   Shutting Down   |
+ *  |                                           +-----------+                   |
+ *  |                                                       |                   |
+ *  |                                                       +--------+----------+
+ *  |                                                                |
+ *  |                                                                | Shutdown:
+ *  |                                                                |  All Reasons
+ *  |                                                                |
+ *  |                                                                |
+ *  |      Shutdown:                                        +--------+----------+
+ *  |        All Reasons                                    |     Shutdown      |
+ *  +-------------------------------------------------------+     Complete      |
+ *                                                          |                   |
+ *                                                          +-------------------+
+ * 
+ */ +class ConsumerStates { + + /** + * Enumerates processing states when working on a shard. + */ + enum ShardConsumerState { + // @formatter:off + WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), + INITIALIZING(new InitializingState()), + PROCESSING(new ProcessingState()), + SHUTDOWN_REQUESTED(new ShutdownNotificationState()), + SHUTTING_DOWN(new ShuttingDownState()), + SHUTDOWN_COMPLETE(new ShutdownCompleteState()); + //@formatter:on + + private final ConsumerState consumerState; + + ShardConsumerState(ConsumerState consumerState) { + this.consumerState = consumerState; + } + + public ConsumerState getConsumerState() { + return consumerState; + } + } + + + /** + * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to + * do when a transition occurs. + * + */ + interface ConsumerState { + /** + * Creates a new task for this state using the passed in consumer to build the task. If there is no task + * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the + * consumer during the execution of this method. + * + * @param consumer + * the consumer to use build the task, or execute state. + * @return a valid task for this state or null if there is no task required. + */ + ITask createTask(ShardConsumer consumer); + + /** + * Provides the next state of the consumer upon success of the task return by + * {@link ConsumerState#createTask(ShardConsumer)}. + * + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState successTransition(); + + /** + * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent + * on the current state, and the shutdown reason. + * + * @param shutdownReason + * the reason that a shutdown was requested + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState shutdownTransition(ShutdownReason shutdownReason); + + /** + * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state + * even if createTask would return a null value. + * + * @return the type of task that this state represents. + */ + TaskType getTaskType(); + + /** + * An enumeration represent the type of this state. Different consumer states may return the same + * {@link ShardConsumerState}. + * + * @return the type of consumer state this represents. + */ + ShardConsumerState getState(); + + boolean isTerminal(); + + } + + /** + * The initial state that any {@link ShardConsumer} should start in. + */ + static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + private static ConsumerState shutdownStateFor(ShutdownReason reason) { + switch (reason) { + case REQUESTED: + return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + case TERMINATE: + case ZOMBIE: + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + default: + throw new IllegalArgumentException("Unknown reason: " + reason); + } + } + + /** + * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent + * shards have been completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Transition to the initializing state to allow the record processor to be initialized in preparation of + * processing.
+ *
Shutdown
+ *
+ *
+ *
All Reasons
+ *
Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be + * informed of the shutdown.
+ *
+ *
+ *
+ */ + static class BlockedOnParentState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), + consumer.getParentShardPollIntervalMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.INITIALIZING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.BLOCK_ON_PARENT_SHARDS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.WAITING_ON_PARENT_SHARDS; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for initializing the record processor with the shard information. + *

Valid Transitions

+ *
+ *
Success
+ *
Transitions to the processing state which will begin to send records to the record processor
+ *
Shutdown
+ *
At this point the record processor has been initialized, but hasn't processed any records. This requires that + * the record processor be notified of the shutdown, even though there is almost no actions the record processor + * could take. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
+ *

+ * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never + * makes an requests to Kinesis for records, so it can't reach the end of a shard. + *

+ *

+ * Transitions to the {@link ShuttingDownState} + *

+ *
+ *
+ *
+ *
+ */ + static class InitializingState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.INITIALIZE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.INITIALIZING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor. + * While in this state the only way a transition will occur is if a shutdown has been triggered. + *

Valid Transitions

+ *
+ *
Success
+ *
Doesn't actually transition, but instead returns the same state
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end + * of the shard triggering a {@link ShutdownReason#TERMINATE}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ProcessingState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.PROCESS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.PROCESSING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState(); + + /** + * This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a + * chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a + * shutdown on the {@link InitializingState} or {@link ProcessingState}. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. An explicit shutdown will allow the record + * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification()); + } + + @Override + public ConsumerState successTransition() { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason == ShutdownReason.REQUESTED) { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the + * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownNotificationCompletionState} + *

+ *
+ *
Shutdown
+ *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is + * waiting that notification. While waiting for the notification no further processing should occur on the + * {@link ShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationCompletionState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason != ShutdownReason.REQUESTED) { + return shutdownReason.getShutdownState(); + } + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Transitions to the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the record processor has processed the final shutdown indication, and depending on the shutdown + * reason taken the correct course of action. From this point on there should be no more interactions with the + * record processor or {@link ShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Transitions to {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShuttingDownState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), + consumer.getStreamConfig().getStreamProxy(), + consumer.getStreamConfig().getInitialPositionInStream(), + consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), + consumer.getTaskBackoffTimeMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTTING_DOWN; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any + * further actions. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Remains in {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShutdownCompleteState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + if (consumer.getShutdownNotification() != null) { + consumer.getShutdownNotification().shutdownComplete(); + } + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_COMPLETE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_COMPLETE; + } + + @Override + public boolean isTerminal() { + return true; + } + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 0119dcc3..59de31be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -14,8 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -33,8 +34,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; @@ -200,23 +201,29 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getCurrentAssignments() { - List assignments = new LinkedList(); Collection leases = getAssignments(); - if ((leases != null) && (!leases.isEmpty())) { - for (KinesisClientLease lease : leases) { - Set parentShardIds = lease.getParentShardIds(); - ShardInfo assignment = - new ShardInfo( - lease.getLeaseKey(), - lease.getConcurrencyToken().toString(), - parentShardIds, - lease.getCheckpoint()); - assignments.add(assignment); - } + return convertLeasesToAssignments(leases); + + } + + public static List convertLeasesToAssignments(Collection leases) { + if (leases == null || leases.isEmpty()) { + return Collections.emptyList(); } + List assignments = new ArrayList<>(leases.size()); + for (KinesisClientLease lease : leases) { + assignments.add(convertLeaseToAssignment(lease)); + } + return assignments; } + public static ShardInfo convertLeaseToAssignment(KinesisClientLease lease) { + Set parentShardIds = lease.getParentShardIds(); + return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), parentShardIds, + lease.getCheckpoint()); + } + /** * Initialize the lease coordinator (create the lease table if needed). * @throws DependencyException diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java index 615467a0..e61da491 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java @@ -63,4 +63,12 @@ class MetricsCollectingTaskDecorator implements ITask { return other.getTaskType(); } + @Override + public String toString() { + return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")"; + } + + ITask getOther() { + return other; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 16daafc2..69922670 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -69,7 +69,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { */ @Override public synchronized void checkpoint() - throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { if (LOG.isDebugEnabled()) { LOG.debug("Checkpointing " + shardInfo.getShardId() + ", " + " token " + shardInfo.getConcurrencyToken() + " at largest permitted value " + this.largestPermittedCheckpointValue); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b9e8d2df..63cce40d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -24,10 +25,10 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.google.common.annotations.VisibleForTesting; /** * Responsible for consuming data records of a (specified) shard. @@ -36,13 +37,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; */ class ShardConsumer { - /** - * Enumerates processing states when working on a shard. - */ - enum ShardConsumerState { - WAITING_ON_PARENT_SHARDS, INITIALIZING, PROCESSING, SHUTTING_DOWN, SHUTDOWN_COMPLETE; - } - private static final Log LOG = LogFactory.getLog(ShardConsumer.class); private final StreamConfig streamConfig; @@ -68,13 +62,13 @@ class ShardConsumer { * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * much coordination/synchronization to handle concurrent reads/updates. */ - private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS; + private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE; /* * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. */ - private volatile boolean beginShutdown; private volatile ShutdownReason shutdownReason; + private volatile ShutdownNotification shutdownNotification; /** * @param shardInfo Shard information @@ -129,41 +123,19 @@ class ShardConsumer { return checkAndSubmitNextTask(); } - // CHECKSTYLE:OFF CyclomaticComplexity + private boolean readyForNextTask() { + return future == null || future.isCancelled() || future.isDone(); + } + private synchronized boolean checkAndSubmitNextTask() { - // Task completed successfully (without exceptions) - boolean taskCompletedSuccessfully = false; boolean submittedNewTask = false; - if ((future == null) || future.isCancelled() || future.isDone()) { - if ((future != null) && future.isDone()) { - try { - TaskResult result = future.get(); - if (result.getException() == null) { - taskCompletedSuccessfully = true; - if (result.isShardEndReached()) { - markForShutdown(ShutdownReason.TERMINATE); - } - } else { - if (LOG.isDebugEnabled()) { - Exception taskException = result.getException(); - if (taskException instanceof BlockedOnParentShardException) { - // No need to log the stack trace for this exception (it is very specific). - LOG.debug("Shard " + shardInfo.getShardId() - + " is blocked on completion of parent shard."); - } else { - LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", - result.getException()); - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - // Setting future to null so we don't misinterpret task completion status in case of exceptions - future = null; - } + if (readyForNextTask()) { + TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE; + if (future != null && future.isDone()) { + taskOutcome = determineTaskOutcome(); } - updateState(taskCompletedSuccessfully); + + updateState(taskOutcome); ITask nextTask = getNextTask(); if (nextTask != null) { currentTask = nextTask; @@ -196,7 +168,56 @@ class ShardConsumer { return submittedNewTask; } - // CHECKSTYLE:ON CyclomaticComplexity + public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + + private enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE + } + + private TaskOutcome determineTaskOutcome() { + try { + TaskResult result = future.get(); + if (result.getException() == null) { + if (result.isShardEndReached()) { + return TaskOutcome.END_OF_SHARD; + } + return TaskOutcome.SUCCESSFUL; + } + logTaskException(result); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // Setting future to null so we don't misinterpret task completion status in case of exceptions + future = null; + } + return TaskOutcome.FAILURE; + } + + private void logTaskException(TaskResult taskResult) { + if (LOG.isDebugEnabled()) { + Exception taskException = taskResult.getException(); + if (taskException instanceof BlockedOnParentShardException) { + // No need to log the stack trace for this exception (it is very specific). + LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard."); + } else { + LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", + taskResult.getException()); + } + } + } + + /** + * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint + * before being shutdown. + * + * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. + */ + void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + this.shutdownNotification = shutdownNotification; + markForShutdown(ShutdownReason.REQUESTED); + } /** * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). @@ -205,17 +226,15 @@ class ShardConsumer { * @return true if shutdown is complete (false if shutdown is still in progress) */ synchronized boolean beginShutdown() { - if (currentState != ShardConsumerState.SHUTDOWN_COMPLETE) { - markForShutdown(ShutdownReason.ZOMBIE); - checkAndSubmitNextTask(); - } + markForShutdown(ShutdownReason.ZOMBIE); + checkAndSubmitNextTask(); + return isShutdown(); } synchronized void markForShutdown(ShutdownReason reason) { - beginShutdown = true; // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) - if ((shutdownReason == null) || (shutdownReason == ShutdownReason.TERMINATE)) { + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { shutdownReason = reason; } } @@ -227,7 +246,7 @@ class ShardConsumer { * @return true if shutdown is complete */ boolean isShutdown() { - return currentState == ShardConsumerState.SHUTDOWN_COMPLETE; + return currentState.isTerminal(); } /** @@ -243,48 +262,7 @@ class ShardConsumer { * @return Return next task to run */ private ITask getNextTask() { - ITask nextTask = null; - switch (currentState) { - case WAITING_ON_PARENT_SHARDS: - nextTask = new BlockOnParentShardTask(shardInfo, leaseManager, parentShardPollIntervalMillis); - break; - case INITIALIZING: - nextTask = - new InitializeTask(shardInfo, - recordProcessor, - checkpoint, - recordProcessorCheckpointer, - dataFetcher, - taskBackoffTimeMillis, - streamConfig); - break; - case PROCESSING: - nextTask = - new ProcessTask(shardInfo, - streamConfig, - recordProcessor, - recordProcessorCheckpointer, - dataFetcher, - taskBackoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist); - break; - case SHUTTING_DOWN: - nextTask = - new ShutdownTask(shardInfo, - recordProcessor, - recordProcessorCheckpointer, - shutdownReason, - streamConfig.getStreamProxy(), - streamConfig.getInitialPositionInStream(), - cleanupLeasesOfCompletedShards, - leaseManager, - taskBackoffTimeMillis); - break; - case SHUTDOWN_COMPLETE: - break; - default: - break; - } + ITask nextTask = currentState.createTask(this); if (nextTask == null) { return null; @@ -297,71 +275,93 @@ class ShardConsumer { * Note: This is a private/internal method with package level access solely for testing purposes. * Update state based on information about: task success, current state, and shutdown info. * - * @param taskCompletedSuccessfully Whether (current) task completed successfully. + * @param taskOutcome The outcome of the last task */ - // CHECKSTYLE:OFF CyclomaticComplexity - void updateState(boolean taskCompletedSuccessfully) { - if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) { - // Shutdown was completed and there nothing we can do after that - return; + void updateState(TaskOutcome taskOutcome) { + if (taskOutcome == TaskOutcome.END_OF_SHARD) { + markForShutdown(ShutdownReason.TERMINATE); } - if ((currentTask == null) && beginShutdown) { - // Shard didn't start any tasks and can be shutdown fast - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; - return; - } - if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) { - // Shard received signal to start shutdown. - // Whatever task we were working on should be stopped and shutdown task should be executed - currentState = ShardConsumerState.SHUTTING_DOWN; - return; - } - switch (currentState) { - case WAITING_ON_PARENT_SHARDS: - if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.INITIALIZING; - } - break; - case INITIALIZING: - if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.PROCESSING; - } - break; - case PROCESSING: - if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.PROCESSING; - } - break; - case SHUTTING_DOWN: - if (currentTask == null - || (taskCompletedSuccessfully && TaskType.SHUTDOWN.equals(currentTask.getTaskType()))) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; - } - break; - default: - LOG.error("Unexpected state: " + currentState); - break; + if (isShutdownRequested()) { + currentState = currentState.shutdownTransition(shutdownReason); + } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { + if (currentState.getTaskType() == currentTask.getTaskType()) { + currentState = currentState.successTransition(); + } else { + LOG.error("Current State task type of '" + currentState.getTaskType() + + "' doesn't match the current tasks type of '" + currentTask.getTaskType() + + "'. This shouldn't happen, and indicates a programming error. " + + "Unable to safely transition to the next state."); + } } + // + // Don't change state otherwise + // + } - // CHECKSTYLE:ON CyclomaticComplexity + @VisibleForTesting + boolean isShutdownRequested() { + return shutdownReason != null; + } /** * Private/Internal method - has package level access solely for testing purposes. * * @return the currentState */ - ShardConsumerState getCurrentState() { - return currentState; + ConsumerStates.ShardConsumerState getCurrentState() { + return currentState.getState(); } - /** - * Private/Internal method - has package level access solely for testing purposes. - * - * @return the beginShutdown - */ - boolean isBeginShutdown() { - return beginShutdown; + StreamConfig getStreamConfig() { + return streamConfig; } + IRecordProcessor getRecordProcessor() { + return recordProcessor; + } + + RecordProcessorCheckpointer getRecordProcessorCheckpointer() { + return recordProcessorCheckpointer; + } + + ExecutorService getExecutorService() { + return executorService; + } + + ShardInfo getShardInfo() { + return shardInfo; + } + + KinesisDataFetcher getDataFetcher() { + return dataFetcher; + } + + ILeaseManager getLeaseManager() { + return leaseManager; + } + + ICheckpoint getCheckpoint() { + return checkpoint; + } + + long getParentShardPollIntervalMillis() { + return parentShardPollIntervalMillis; + } + + boolean isCleanupLeasesOfCompletedShards() { + return cleanupLeasesOfCompletedShards; + } + + long getTaskBackoffTimeMillis() { + return taskBackoffTimeMillis; + } + + Future getFuture() { + return future; + } + + ShutdownNotification getShutdownNotification() { + return shutdownNotification; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java new file mode 100644 index 00000000..b3792131 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java @@ -0,0 +1,71 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.CountDownLatch; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; + +/** + * Contains callbacks for completion of stages in a requested record processor shutdown. + * + */ +class ShardConsumerShutdownNotification implements ShutdownNotification { + + private final LeaseCoordinator leaseCoordinator; + private final KinesisClientLease lease; + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + + private boolean notificationComplete = false; + private boolean allNotificationCompleted = false; + + /** + * Creates a new shutdown request object. + * + * @param leaseCoordinator + * the lease coordinator used to drop leases from once the initial shutdown request is completed. + * @param lease + * the lease that this shutdown request will free once initial shutdown is complete + * @param notificationCompleteLatch + * used to inform the caller once the + * {@link IShutdownNotificationAware} object has been + * notified of the shutdown request. + * @param shutdownCompleteLatch + * used to inform the caller once the record processor is fully shutdown + */ + ShardConsumerShutdownNotification(LeaseCoordinator leaseCoordinator, KinesisClientLease lease, + CountDownLatch notificationCompleteLatch, CountDownLatch shutdownCompleteLatch) { + this.leaseCoordinator = leaseCoordinator; + this.lease = lease; + this.notificationCompleteLatch = notificationCompleteLatch; + this.shutdownCompleteLatch = shutdownCompleteLatch; + } + + @Override + public void shutdownNotificationComplete() { + if (notificationComplete) { + return; + } + // + // Once the notification has been completed, the lease needs to dropped to allow the worker to complete + // shutdown of the record processor. + // + leaseCoordinator.dropLease(lease); + notificationCompleteLatch.countDown(); + notificationComplete = true; + } + + @Override + public void shutdownComplete() { + if (allNotificationCompleted) { + return; + } + if (!notificationComplete) { + notificationCompleteLatch.countDown(); + } + shutdownCompleteLatch.countDown(); + allNotificationCompleted = true; + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java new file mode 100644 index 00000000..8ee96537 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java @@ -0,0 +1,155 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete. + */ +class ShutdownFuture implements Future { + + private static final Log log = LogFactory.getLog(ShutdownFuture.class); + + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + + ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { + this.shutdownCompleteLatch = shutdownCompleteLatch; + this.notificationCompleteLatch = notificationCompleteLatch; + this.worker = worker; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException("Cannot cancel a shutdown process"); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return isWorkerShutdownComplete(); + } + + private boolean isWorkerShutdownComplete() { + return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); + } + + private long outstandingRecordProcessors(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + final long startNanos = System.nanoTime(); + + // + // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // There is the possibility of a race condition where a lease is terminated after the shutdown request + // notification is started, but before the ShardConsumer is sent the notification. In this case the + // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. + // + if (!notificationCompleteLatch.await(timeout, unit)) { + long awaitingNotification = notificationCompleteLatch.getCount(); + long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); + log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and " + + awaitingFinalShutdown + " awaiting final shutdown"); + if (awaitingFinalShutdown != 0) { + // + // The number of record processor awaiting final shutdown should be a superset of the those awaiting + // notification + // + return checkWorkerShutdownMiss(awaitingFinalShutdown); + } + } + + long remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time."); + + // + // Once all record processors have been notified of the shutdown it is safe to allow the worker to + // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. + // + worker.shutdown(); + remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time."); + + // + // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown + // processing. This should really be a no-op since as part of the notification completion the lease for + // ShardConsumer is terminated. + // + if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) { + long outstanding = shutdownCompleteLatch.getCount(); + log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); + + return checkWorkerShutdownMiss(outstanding); + } + return 0; + } + + private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) { + long checkNanos = System.nanoTime() - startNanos; + return unit.toNanos(timeout) - checkNanos; + } + + private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException { + if (remainingNanos <= 0) { + throw new TimeoutException(message); + } + } + + /** + * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record + * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the + * latch should be decremented before the shutdown completion. + * + * @param outstanding + * the number of record processor still awaiting shutdown. + * @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already. + */ + private long checkWorkerShutdownMiss(long outstanding) { + if (isWorkerShutdownComplete()) { + if (outstanding != 0) { + log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " + + worker.isShutdownComplete() + " -- Consumer Map: " + + worker.getShardInfoShardConsumerMap().size()); + } + return 0; + } + return outstanding; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + boolean complete = false; + do { + try { + long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); + complete = outstanding == 0; + log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); + } catch (TimeoutException te) { + log.info("Timeout while waiting for completion: " + te.getMessage()); + } + + } while(!complete); + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long outstanding = outstandingRecordProcessors(timeout, unit); + if (outstanding != 0) { + throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown."); + } + return null; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java new file mode 100644 index 00000000..928e6900 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; + +/** + * A shutdown request to the ShardConsumer + */ +public interface ShutdownNotification { + /** + * Used to indicate that the record processor has been notified of a requested shutdown, and given the chance to + * checkpoint. + * + */ + void shutdownNotificationComplete(); + + /** + * Used to indicate that the record processor has completed the call to + * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)} has + * completed. + */ + void shutdownComplete(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java new file mode 100644 index 00000000..13711f23 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -0,0 +1,43 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; + +/** + * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. + */ +class ShutdownNotificationTask implements ITask { + + private final IRecordProcessor recordProcessor; + private final IRecordProcessorCheckpointer recordProcessorCheckpointer; + private final ShutdownNotification shutdownNotification; + + ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) { + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; + this.shutdownNotification = shutdownNotification; + } + + @Override + public TaskResult call() { + try { + if (recordProcessor instanceof IShutdownNotificationAware) { + IShutdownNotificationAware shutdownNotificationAware = (IShutdownNotificationAware) recordProcessor; + try { + shutdownNotificationAware.shutdownRequested(recordProcessorCheckpointer); + } catch (Exception ex) { + return new TaskResult(ex); + } + } + return new TaskResult(null); + } finally { + shutdownNotification.shutdownNotificationComplete(); + } + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java similarity index 52% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java index 473b488c..8d0dfc80 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java @@ -12,7 +12,12 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.types; +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; + /** * Reason the RecordProcessor is being shutdown. @@ -28,7 +33,7 @@ public enum ShutdownReason { * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started * processing data). */ - ZOMBIE, + ZOMBIE(3, ShardConsumerState.SHUTTING_DOWN.getConsumerState()), /** * Terminate processing for this RecordProcessor (resharding use case). @@ -36,5 +41,38 @@ public enum ShutdownReason { * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records * from this shard and processing of child shards can be started. */ - TERMINATE + TERMINATE(2, ShardConsumerState.SHUTTING_DOWN.getConsumerState()), + + /** + * Indicates that the entire application is being shutdown, and if desired the record processor will be given a + * final chance to checkpoint. This state will not trigger a direct call to + * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but + * instead depend on a different interface for backward compatibility. + */ + REQUESTED(1, ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()); + + private final int rank; + private final ConsumerState shutdownState; + + ShutdownReason(int rank, ConsumerState shutdownState) { + this.rank = rank; + this.shutdownState = shutdownState; + } + + /** + * Indicates whether the given reason can override the current reason. + * + * @param reason the reason to transition to + * @return true if the transition is allowed, false if it's not. + */ + public boolean canTransitionTo(ShutdownReason reason) { + if (reason == null) { + return false; + } + return reason.rank > this.rank; + } + + ConsumerState getShutdownState() { + return shutdownState; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 3ce05203..d40fbb0e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.annotations.VisibleForTesting; /** * Task for invoking the RecordProcessor shutdown() callback. @@ -155,4 +155,9 @@ class ShutdownTask implements ITask { return taskType; } + @VisibleForTesting + ShutdownReason getReason() { + return reason; + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java index 426162d4..32fd1cd2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** * Enumerates types of tasks executed as part of processing a shard. */ -enum TaskType { +public enum TaskType { /** * Polls and waits until parent shard(s) have been fully processed. */ @@ -34,8 +34,16 @@ enum TaskType { * Shutdown of RecordProcessor. */ SHUTDOWN, + /** + * Graceful shutdown has been requested, and notification of the record processor will occur. + */ + SHUTDOWN_NOTIFICATION, + /** + * Occurs once the shutdown has been completed + */ + SHUTDOWN_COMPLETE, /** * Sync leases/activities corresponding to Kinesis shards. */ - SHARDSYNC; + SHARDSYNC } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index b644a790..2a1e5484 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,13 +14,17 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -38,15 +42,18 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Worker is the high level class that Kinesis applications use to start @@ -85,6 +92,7 @@ public class Worker implements Runnable { private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; + private volatile boolean shutdownComplete = false; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -499,11 +507,89 @@ public class Worker implements Runnable { } /** - * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that - * if executor services were passed to the worker by the user, worker will not attempt to shutdown - * those resources. + * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, + * of the impending shutdown. This gives the record processor a final chance to checkpoint. + * + * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is + * lost after requesting shutdown, but before the notification is dispatched. + * + *

Requested Shutdown Process

When a shutdown process is requested it operates slightly differently to + * allow the record processors a chance to checkpoint a final time. + *
    + *
  1. Call to request shutdown invoked.
  2. + *
  3. Worker stops attempting to acquire new leases
  4. + *
  5. Record Processor Shutdown Begins + *
      + *
    1. Record processor is notified of the impending shutdown, and given a final chance to checkpoint
    2. + *
    3. The lease for the record processor is then dropped.
    4. + *
    5. The record processor enters into an idle state waiting for the worker to complete final termination
    6. + *
    7. The worker will detect a record processor that has lost it's lease, and will terminate the record processor + * with {@link ShutdownReason#ZOMBIE}
    8. + *
    + *
  6. + *
  7. The worker will shutdown all record processors.
  8. + *
  9. Once all record processors have been terminated, the worker will terminate all owned resources.
  10. + *
  11. Once the worker shutdown is complete, the returned future is completed.
  12. + *
+ * + * + * + * @return a Future that will be set once the shutdown is complete. + */ + public Future requestShutdown() { + + leaseCoordinator.stopLeaseTaker(); + // + // Stop accepting new leases + // + Collection leases = leaseCoordinator.getAssignments(); + if (leases == null || leases.isEmpty()) { + // + // If there are no leases shutdown is already completed. + // + return Futures.immediateFuture(null); + } + CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); + CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); + for (KinesisClientLease lease : leases) { + ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, + notificationCompleteLatch, shutdownCompleteLatch); + ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); + shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification); + } + + return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); + + } + + boolean isShutdownComplete() { + return shutdownComplete; + } + + ConcurrentMap getShardInfoShardConsumerMap() { + return shardInfoShardConsumerMap; + } + + /** + * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor + * services were passed to the worker by the user, worker will not attempt to shutdown those resources. + * + *

Shutdown Process

When called this will start shutdown of the record processor, and eventually shutdown + * the worker itself. + *
    + *
  1. Call to start shutdown invoked
  2. + *
  3. Lease coordinator told to stop taking leases, and to drop existing leases.
  4. + *
  5. Worker discovers record processors that no longer have leases.
  6. + *
  7. Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.
  8. + *
  9. Once all record processors are shutdown, worker terminates owned resources.
  10. + *
  11. Shutdown complete.
  12. + *
*/ public void shutdown() { + if (shutdown) { + LOG.warn("Shutdown requested a second time."); + return; + } LOG.info("Worker shutdown requested."); // Set shutdown flag, so Worker.run can start shutdown process. @@ -530,6 +616,7 @@ public class Worker implements Runnable { if (metricsFactory instanceof WorkerCWMetricsFactory) { ((CWMetricsFactory) metricsFactory).shutdown(); } + shutdownComplete = true; } /** @@ -757,7 +844,8 @@ public class Worker implements Runnable { * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { - return new WorkerThreadPoolExecutor(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build(); + return new WorkerThreadPoolExecutor(threadFactory); } /** @@ -786,10 +874,10 @@ public class Worker implements Runnable { static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { private static final long DEFAULT_KEEP_ALIVE_TIME = 60L; - WorkerThreadPoolExecutor() { + WorkerThreadPoolExecutor(ThreadFactory threadFactory) { // Defaults are based on Executors.newCachedThreadPool() - super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new SynchronousQueue()); + super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), + threadFactory); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java index 9ea2c654..c533a4da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.types; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * Container for the parameters to the IRecordProcessor's diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index b6c26d3d..b3a0ce6c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -15,9 +15,9 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.Collection; - import java.util.HashSet; import java.util.Set; +import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -41,6 +41,16 @@ public class KinesisClientLease extends Lease { this.parentShardIds.addAll(other.getParentShardIds()); } + KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, + Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint, + Set parentShardIds) { + super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos); + + this.checkpoint = checkpoint; + this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; + this.parentShardIds.addAll(parentShardIds); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java index 0638e4db..32234e35 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java @@ -63,11 +63,17 @@ public class Lease { * @param lease lease to copy */ protected Lease(Lease lease) { - this.leaseKey = lease.getLeaseKey(); - this.leaseOwner = lease.getLeaseOwner(); - this.leaseCounter = lease.getLeaseCounter(); - this.concurrencyToken = lease.getConcurrencyToken(); - this.lastCounterIncrementNanos = lease.getLastCounterIncrementNanos(); + this(lease.getLeaseKey(), lease.getLeaseOwner(), lease.getLeaseCounter(), lease.getConcurrencyToken(), + lease.getLastCounterIncrementNanos()); + } + + protected Lease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, + Long lastCounterIncrementNanos) { + this.leaseKey = leaseKey; + this.leaseOwner = leaseOwner; + this.leaseCounter = leaseCounter; + this.concurrencyToken = concurrencyToken; + this.lastCounterIncrementNanos = lastCounterIncrementNanos; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index 55c26674..921cfbc4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -83,6 +84,7 @@ public class LeaseCoordinator { private ScheduledExecutorService leaseCoordinatorThreadPool; private final ExecutorService leaseRenewalThreadpool; private volatile boolean running = false; + private ScheduledFuture takerFuture; /** * Constructor. @@ -199,9 +201,15 @@ public class LeaseCoordinator { leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY); // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation. - leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS); + takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), + 0L, + takerIntervalMillis, + TimeUnit.MILLISECONDS); // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation. - leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS); + leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), + 0L, + renewerIntervalMillis, + TimeUnit.MILLISECONDS); running = true; } @@ -309,6 +317,27 @@ public class LeaseCoordinator { } } + /** + * Requests the cancellation of the lease taker. + */ + public void stopLeaseTaker() { + takerFuture.cancel(false); + + } + + /** + * Requests that renewals for the given lease are stopped. + * + * @param lease the lease to stop renewing. + */ + public void dropLease(T lease) { + synchronized (shutdownLock) { + if (lease != null) { + leaseRenewer.dropLease(lease); + } + } + } + /** * @return true if this LeaseCoordinator is running */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index ae8040a5..b10ee1a3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -369,6 +369,15 @@ public class LeaseRenewer implements ILeaseRenewer { ownedLeases.clear(); } + /** + * {@inheritDoc} + * @param lease the lease to drop. + */ + @Override + public void dropLease(T lease) { + ownedLeases.remove(lease.getLeaseKey()); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java index 6c2a8527..87e9182a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java @@ -73,6 +73,13 @@ public interface ILeaseRenewer { */ public void clearCurrentlyHeldLeases(); + /** + * Stops the lease renewer from continunig to maintain the given lease. + * + * @param lease the lease to drop. + */ + void dropLease(T lease); + /** * Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as * leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index d4ef9b20..2cda0632 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,7 +26,7 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 4b15a532..bce1793c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index a33e8ebf..babf6ac2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java index 524b481e..82ed5458 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java @@ -14,7 +14,7 @@ */ package com.amazonaws.services.kinesis.multilang.messages; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * A message to indicate to the client's process that it should shutdown and then terminate. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java new file mode 100644 index 00000000..c0a778e9 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -0,0 +1,385 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.hamcrest.Condition; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerStatesTest { + + @Mock + private ShardConsumer consumer; + @Mock + private StreamConfig streamConfig; + @Mock + private IRecordProcessor recordProcessor; + @Mock + private RecordProcessorCheckpointer recordProcessorCheckpointer; + @Mock + private ExecutorService executorService; + @Mock + private ShardInfo shardInfo; + @Mock + private KinesisDataFetcher dataFetcher; + @Mock + private ILeaseManager leaseManager; + @Mock + private ICheckpoint checkpoint; + @Mock + private Future future; + @Mock + private ShutdownNotification shutdownNotification; + @Mock + private IKinesisProxy kinesisProxy; + @Mock + private InitialPositionInStreamExtended initialPositionInStream; + + private long parentShardPollIntervalMillis = 0xCAFE; + private boolean cleanupLeasesOfCompletedShards = true; + private long taskBackoffTimeMillis = 0xBEEF; + private ShutdownReason reason = ShutdownReason.TERMINATE; + + @Before + public void setup() { + when(consumer.getStreamConfig()).thenReturn(streamConfig); + when(consumer.getRecordProcessor()).thenReturn(recordProcessor); + when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer); + when(consumer.getExecutorService()).thenReturn(executorService); + when(consumer.getShardInfo()).thenReturn(shardInfo); + when(consumer.getDataFetcher()).thenReturn(dataFetcher); + when(consumer.getLeaseManager()).thenReturn(leaseManager); + when(consumer.getCheckpoint()).thenReturn(checkpoint); + when(consumer.getFuture()).thenReturn(future); + when(consumer.getShutdownNotification()).thenReturn(shutdownNotification); + when(consumer.getParentShardPollIntervalMillis()).thenReturn(parentShardPollIntervalMillis); + when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); + when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); + when(consumer.getShutdownReason()).thenReturn(reason); + + } + + private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; + + @Test + public void blockOnParentStateTest() { + ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + ITask task = state.createTask(consumer); + + assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, + taskWith(BlockOnParentShardTask.class, LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager))); + assertThat(task, taskWith(BlockOnParentShardTask.class, Long.class, "parentShardPollIntervalMillis", + equalTo(parentShardPollIntervalMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState())); + for (ShutdownReason shutdownReason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(shutdownReason), + equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)); + assertThat(state.getTaskType(), equalTo(TaskType.BLOCK_ON_PARENT_SHARDS)); + + } + + @Test + public void initializingStateTest() { + ConsumerState state = ShardConsumerState.INITIALIZING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, initTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, initTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, initTask(ICheckpoint.class, "checkpoint", equalTo(checkpoint))); + assertThat(task, initTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, initTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, initTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.INITIALIZING)); + assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE)); + } + + @Test + public void processingStateTest() { + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + + } + + @Test + public void shutdownRequestState() { + ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + + ITask task = state.createTask(consumer); + + assertThat(task, shutdownReqTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, shutdownReqTask(IRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer))); + assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification))); + + assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION)); + + } + + @Test + public void shutdownRequestCompleteStateTest() { + ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; + + assertThat(state.createTask(consumer), nullValue()); + + assertThat(state.successTransition(), equalTo(state)); + + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), equalTo(state)); + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION)); + + } + + @Test + public void shuttingDownStateTest() { + ConsumerState state = ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + + when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); + when(streamConfig.getInitialPositionInStream()).thenReturn(initialPositionInStream); + + ITask task = state.createTask(consumer); + + assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, shutdownTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, shutdownTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); + assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy))); + assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager))); + assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", + equalTo(initialPositionInStream))); + assertThat(task, + shutdownTask(Boolean.class, "cleanupLeasesOfCompletedShards", equalTo(cleanupLeasesOfCompletedShards))); + assertThat(task, shutdownTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + + for (ShutdownReason reason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(reason), + equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTTING_DOWN)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN)); + + } + + @Test + public void shutdownCompleteStateTest() { + ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + + assertThat(state.createTask(consumer), nullValue()); + verify(consumer, times(2)).getShutdownNotification(); + verify(shutdownNotification).shutdownComplete(); + + assertThat(state.successTransition(), equalTo(state)); + for(ShutdownReason reason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(reason), equalTo(state)); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_COMPLETE)); + } + + @Test + public void shutdownCompleteStateNullNotificationTest() { + ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + + when(consumer.getShutdownNotification()).thenReturn(null); + assertThat(state.createTask(consumer), nullValue()); + + verify(consumer).getShutdownNotification(); + verify(shutdownNotification, never()).shutdownComplete(); + } + + static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher shutdownReqTask( + Class valueTypeClass, String propertyName, Matcher matcher) { + return taskWith(ShutdownNotificationTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher procTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher initTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher taskWith(Class taskTypeClass, + Class valueTypeClass, String propertyName, Matcher matcher) { + return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); + } + + private static class ReflectionPropertyMatcher extends TypeSafeDiagnosingMatcher { + + private final Class taskTypeClass; + private final Class valueTypeClazz; + private final Matcher matcher; + private final String propertyName; + private final Field matchingField; + + private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, + Matcher matcher, String propertyName) { + this.taskTypeClass = taskTypeClass; + this.valueTypeClazz = valueTypeClass; + this.matcher = matcher; + this.propertyName = propertyName; + + Field[] fields = taskTypeClass.getDeclaredFields(); + Field matching = null; + for (Field field : fields) { + if (propertyName.equals(field.getName())) { + matching = field; + } + } + this.matchingField = matching; + + } + + @Override + protected boolean matchesSafely(ITask item, Description mismatchDescription) { + + return Condition.matched(item, mismatchDescription).and(new Condition.Step() { + @Override + public Condition apply(ITask value, Description mismatch) { + if (taskTypeClass.equals(value.getClass())) { + return Condition.matched(taskTypeClass.cast(value), mismatch); + } + mismatch.appendText("Expected task type of ").appendText(taskTypeClass.getName()) + .appendText(" but was ").appendText(value.getClass().getName()); + return Condition.notMatched(); + } + }).and(new Condition.Step() { + @Override + public Condition apply(TaskType value, Description mismatch) { + if (matchingField == null) { + mismatch.appendText("Field ").appendText(propertyName).appendText(" not present in ") + .appendText(taskTypeClass.getName()); + return Condition.notMatched(); + } + + try { + return Condition.matched(getValue(value), mismatch); + } catch (RuntimeException re) { + mismatch.appendText("Failure while retrieving value for ").appendText(propertyName); + return Condition.notMatched(); + } + + } + }).and(new Condition.Step() { + @Override + public Condition apply(Object value, Description mismatch) { + if (value != null && !valueTypeClazz.isAssignableFrom(value.getClass())) { + mismatch.appendText("Expected a value of type ").appendText(valueTypeClazz.getName()) + .appendText(" but was ").appendText(value.getClass().getName()); + return Condition.notMatched(); + } + return Condition.matched(valueTypeClazz.cast(value), mismatch); + } + }).matching(matcher); + } + + @Override + public void describeTo(Description description) { + description + .appendText( + "A " + taskTypeClass.getName() + " task with the property " + propertyName + " matching ") + .appendDescriptionOf(matcher); + } + + private Object getValue(TaskType task) { + + matchingField.setAccessible(true); + try { + return matchingField.get(task); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to retrieve the value for " + matchingField.getName()); + } + } + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 755d08a4..893f64ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -46,17 +46,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.ShardConsumerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -68,6 +69,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests of {@link ShardConsumer}. */ +@RunWith(MockitoJUnitRunner.class) public class ShardConsumerTest { private static final Log LOG = LogFactory.getLog(ShardConsumerTest.class); @@ -86,6 +88,17 @@ public class ShardConsumerTest { // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); + @Mock + private IRecordProcessor processor; + @Mock + private IKinesisProxy streamProxy; + @Mock + private ILeaseManager leaseManager; + @Mock + private ICheckpoint checkpoint; + @Mock + private ShutdownNotification shutdownNotification; + /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -93,12 +106,9 @@ public class ShardConsumerTest { @Test public final void testInitializationStateUponFailure() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); + when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -120,19 +130,19 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } @@ -143,13 +153,9 @@ public class ShardConsumerTest { @Test public final void testInitializationStateUponSubmissionFailure() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); ExecutorService spyExecutorService = spy(executorService); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -171,31 +177,27 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -219,10 +221,10 @@ public class ShardConsumerTest { when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); // Throw Error when IRecordProcessor.initialize() is invoked. @@ -230,7 +232,7 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(any(InitializationInput.class)); try { @@ -241,24 +243,24 @@ public class ShardConsumerTest { assertThat(e.getCause(), instanceOf(ExecutionException.class)); } Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(any(InitializationInput.class)); doNothing().when(processor).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(2)).initialize(any(InitializationInput.class)); // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} + * Test method for {@link ShardConsumer#consumeShard()} */ @Test public final void testConsumeShard() throws Exception { @@ -279,8 +281,6 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - @SuppressWarnings("unchecked") - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); TestStreamlet processor = new TestStreamlet(); @@ -306,20 +306,20 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize - Thread.sleep(50L); + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -327,11 +327,26 @@ public class ShardConsumerTest { } assertThat(processor.getShutdownReason(), nullValue()); + consumer.notifyShutdownRequested(shutdownNotification); + consumer.consumeShard(); + assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true)); + Thread.sleep(50); + assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + verify(shutdownNotification).shutdownNotificationComplete(); + assertThat(processor.isShutdownNotificationCalled(), equalTo(true)); + consumer.consumeShard(); + Thread.sleep(50); + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE)); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); + consumer.consumeShard(); + verify(shutdownNotification, atLeastOnce()).shutdownComplete(); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); @@ -344,8 +359,7 @@ public class ShardConsumerTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} - * that starts from initial position of type AT_TIMESTAMP. + * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @Test public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { @@ -369,8 +383,6 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); - @SuppressWarnings("unchecked") - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); TestStreamlet processor = new TestStreamlet(); @@ -397,11 +409,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -410,7 +422,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -420,9 +432,9 @@ public class ShardConsumerTest { assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java index 5ad42359..314974b0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; /** * Helper class to verify shard lineage in unit tests that use TestStreamlet. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java new file mode 100644 index 00000000..cccbc9a1 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java @@ -0,0 +1,236 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +@RunWith(MockitoJUnitRunner.class) +public class ShutdownFutureTest { + + @Mock + private CountDownLatch shutdownCompleteLatch; + @Mock + private CountDownLatch notificationCompleteLatch; + @Mock + private Worker worker; + @Mock + private ConcurrentMap shardInfoConsumerMap; + + @Test + public void testSimpleGetAlreadyCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + + mockNotificationComplete(true); + mockShutdownComplete(true); + + future.get(); + + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(worker).shutdown(); + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testNotificationNotCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + + mockNotificationComplete(false, true); + mockShutdownComplete(true); + + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + when(shardInfoConsumerMap.isEmpty()).thenReturn(false); + when(worker.isShutdownComplete()).thenReturn(false); + + when(notificationCompleteLatch.getCount()).thenReturn(1L); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + expectedTimeoutException(future); + + verify(worker, never()).shutdown(); + + awaitFuture(future); + + verify(notificationCompleteLatch).getCount(); + verifyLatchAwait(notificationCompleteLatch, 2); + + verify(shutdownCompleteLatch).getCount(); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker).shutdown(); + + } + + @Test + public void testShutdownNotCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + mockNotificationComplete(true); + mockShutdownComplete(false, true); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + when(worker.isShutdownComplete()).thenReturn(false); + + mockShardInfoConsumerMap(1); + + expectedTimeoutException(future); + verify(worker).shutdown(); + awaitFuture(future); + + verifyLatchAwait(notificationCompleteLatch, 2); + verifyLatchAwait(shutdownCompleteLatch, 2); + + verify(worker).isShutdownComplete(); + verify(worker).getShardInfoShardConsumerMap(); + + } + + @Test + public void testShutdownNotCompleteButWorkerShutdown() throws Exception { + ShutdownFuture future = create(); + + mockNotificationComplete(true); + mockShutdownComplete(false); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(1); + + awaitFuture(future); + verify(worker).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker).getShardInfoShardConsumerMap(); + verify(shardInfoConsumerMap).size(); + } + + @Test + public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(true); + mockShutdownComplete(false); + + mockOutstanding(shutdownCompleteLatch, 1L); + + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(0); + + awaitFuture(future); + verify(worker).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker, times(2)).getShardInfoShardConsumerMap(); + + verify(shardInfoConsumerMap).isEmpty(); + verify(shardInfoConsumerMap).size(); + } + + @Test + public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(false); + mockShutdownComplete(false); + + mockOutstanding(notificationCompleteLatch, 1L); + mockOutstanding(shutdownCompleteLatch, 1L); + + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(0); + + awaitFuture(future); + verify(worker, never()).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verify(shutdownCompleteLatch, never()).await(); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker, times(2)).getShardInfoShardConsumerMap(); + + verify(shardInfoConsumerMap).isEmpty(); + verify(shardInfoConsumerMap).size(); + } + + @Test(expected = TimeoutException.class) + public void testTimeExceededException() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(false); + mockOutstanding(notificationCompleteLatch, 1L); + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(1); + + future.get(1, TimeUnit.NANOSECONDS); + } + + private ShutdownFuture create() { + return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + } + + private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) { + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; + for(int i = 0; i < additionalItemCounts.length; ++i) { + additionalEmptyStates[i] = additionalItemCounts[i] == 0; + } + when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); + when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); + } + + private void verifyLatchAwait(CountDownLatch latch) throws Exception { + verifyLatchAwait(latch, 1); + } + + private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { + verify(latch, times(times)).await(anyLong(), any(TimeUnit.class)); + } + + private void expectedTimeoutException(ShutdownFuture future) throws Exception { + boolean gotTimeout = false; + try { + awaitFuture(future); + } catch (TimeoutException te) { + gotTimeout = true; + } + assertThat("Expected a timeout exception to occur", gotTimeout); + } + + private void awaitFuture(ShutdownFuture future) throws Exception { + future.get(1, TimeUnit.SECONDS); + } + + private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception { + mockLatch(notificationCompleteLatch, initial, states); + + } + + private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception { + mockLatch(shutdownCompleteLatch, initial, states); + } + + private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception { + when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states); + } + + private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception { + when(latch.getCount()).thenReturn(remaining, additionalRemaining); + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 67b42a0e..9eaf7e8e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -31,7 +31,6 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisC import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -82,7 +81,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. + * Test method for {@link ShutdownTask#call()}. */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -106,7 +105,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. + * Test method for {@link ShutdownTask#call()}. */ @Test public final void testCallWhenSyncingShardsThrows() { @@ -131,7 +130,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#getTaskType()}. + * Test method for {@link ShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java index d9391e8a..174410e7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,12 +36,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; /** * Streamlet that tracks records it's seen - useful for testing. */ -class TestStreamlet implements IRecordProcessor { +class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(TestStreamlet.class); @@ -55,6 +56,11 @@ class TestStreamlet implements IRecordProcessor { private ShutdownReason shutdownReason; private ShardSequenceVerifier shardSequenceVerifier; private long numProcessRecordsCallsWithEmptyRecordList; + private boolean shutdownNotificationCalled; + + private final CountDownLatch initializeLatch = new CountDownLatch(1); + private final CountDownLatch notifyShutdownLatch = new CountDownLatch(1); + private final CountDownLatch shutdownLatch = new CountDownLatch(1); public TestStreamlet() { @@ -76,6 +82,7 @@ class TestStreamlet implements IRecordProcessor { if (shardSequenceVerifier != null) { shardSequenceVerifier.registerInitialization(shardId); } + initializeLatch.countDown(); } @Override @@ -125,6 +132,8 @@ class TestStreamlet implements IRecordProcessor { throw new RuntimeException(e); } } + + shutdownLatch.countDown(); } /** @@ -148,4 +157,25 @@ class TestStreamlet implements IRecordProcessor { return numProcessRecordsCallsWithEmptyRecordList; } + boolean isShutdownNotificationCalled() { + return shutdownNotificationCalled; + } + + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + shutdownNotificationCalled = true; + notifyShutdownLatch.countDown(); + } + + public CountDownLatch getInitializeLatch() { + return initializeLatch; + } + + public CountDownLatch getNotifyShutdownLatch() { + return notifyShutdownLatch; + } + + public CountDownLatch getShutdownLatch() { + return shutdownLatch; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 53d89dc2..baafa447 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -14,21 +14,16 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.File; import java.lang.Thread.State; @@ -42,21 +37,29 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hamcrest.Condition; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; import org.junit.runner.RunWith; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; @@ -78,8 +81,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -90,6 +93,8 @@ import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Unit tests of Worker. @@ -99,8 +104,8 @@ public class WorkerTest { private static final Log LOG = LogFactory.getLog(WorkerTest.class); - @Rule - public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30)); + // @Rule + // public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30)); private final NullMetricsFactory nullMetricsFactory = new NullMetricsFactory(); private final long taskBackoffTimeMillis = 1L; @@ -140,6 +145,10 @@ public class WorkerTest { private IRecordProcessor v2RecordProcessor; @Mock private ShardConsumer shardConsumer; + @Mock + private Future taskFuture; + @Mock + private TaskResult taskResult; // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -347,10 +356,10 @@ public class WorkerTest { worker.cleanupShardConsumers(assignedShards); // verify shard consumer not present in assignedShards is shut down - Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isBeginShutdown()); + Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isShutdownRequested()); // verify shard consumers present in assignedShards aren't shut down - Assert.assertFalse(consumerOfShardInfo1.isBeginShutdown()); - Assert.assertFalse(consumerOfShardInfo2.isBeginShutdown()); + Assert.assertFalse(consumerOfShardInfo1.isShutdownRequested()); + Assert.assertFalse(consumerOfShardInfo2.isShutdownRequested()); } @Test @@ -690,6 +699,339 @@ public class WorkerTest { assertThat(recordProcessorInterrupted.get(), equalTo(true)); } + @Test + public void testRequestShutdown() throws Exception { + + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + worker.requestShutdown(); + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + worker.runProcessLoop(); + + verify(leaseCoordinator, atLeastOnce()).dropLease(eq(lease)); + leases.clear(); + currentAssignments.clear(); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + @Test + public void testLeaseCancelledAfterShutdownRequest() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + worker.requestShutdown(); + leases.clear(); + currentAssignments.clear(); + worker.runProcessLoop(); + + verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()) + .submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.ZOMBIE)))); + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + @Test + public void testEndOfShardAfterShutdownRequest() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + when(taskResult.isShardEndReached()).thenReturn(true); + + worker.requestShutdown(); + worker.runProcessLoop(); + + verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + verify(executorService).submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.TERMINATE)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher { + + private final Matcher matcher; + + public ShutdownReasonMatcher(Matcher matcher) { + this.matcher = matcher; + } + + @Override + public void describeTo(Description description) { + description.appendText("hasShutdownReason(").appendDescriptionOf(matcher).appendText(")"); + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) { + return Condition.matched(item, mismatchDescription) + .and(new Condition.Step() { + @Override + public Condition apply(MetricsCollectingTaskDecorator value, + Description mismatch) { + if (!(value.getOther() instanceof ShutdownTask)) { + mismatch.appendText("Wrapped task isn't a shutdown task"); + return Condition.notMatched(); + } + return Condition.matched((ShutdownTask) value.getOther(), mismatch); + } + }).and(new Condition.Step() { + @Override + public Condition apply(ShutdownTask value, Description mismatch) { + return Condition.matched(value.getReason(), mismatch); + } + }).matching(matcher); + } + + public static ShutdownReasonMatcher hasReason(Matcher matcher) { + return new ShutdownReasonMatcher(matcher); + } + } + + private static class ShutdownHandlingAnswer implements Answer> { + + final Future defaultFuture; + + public ShutdownHandlingAnswer(Future defaultFuture) { + this.defaultFuture = defaultFuture; + } + + @Override + public Future answer(InvocationOnMock invocation) throws Throwable { + ITask rootTask = (ITask) invocation.getArguments()[0]; + if (rootTask instanceof MetricsCollectingTaskDecorator + && ((MetricsCollectingTaskDecorator) rootTask).getOther() instanceof ShutdownNotificationTask) { + ShutdownNotificationTask task = (ShutdownNotificationTask) ((MetricsCollectingTaskDecorator) rootTask).getOther(); + return Futures.immediateFuture(task.call()); + } + return defaultFuture; + } + } + + private static class TaskTypeMatcher extends TypeSafeMatcher { + + final Matcher expectedTaskType; + + TaskTypeMatcher(TaskType expectedTaskType) { + this(equalTo(expectedTaskType)); + } + + TaskTypeMatcher(Matcher expectedTaskType) { + this.expectedTaskType = expectedTaskType; + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item) { + return expectedTaskType.matches(item.getTaskType()); + } + + @Override + public void describeTo(Description description) { + description.appendText("taskType matches"); + expectedTaskType.describeTo(description); + } + + static TaskTypeMatcher isOfType(TaskType taskType) { + return new TaskTypeMatcher(taskType); + } + + static TaskTypeMatcher matchesType(Matcher matcher) { + return new TaskTypeMatcher(matcher); + } + } + + private static class InnerTaskMatcher extends TypeSafeMatcher { + + final Matcher matcher; + + InnerTaskMatcher(Matcher matcher) { + this.matcher = matcher; + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item) { + return matcher.matches(item.getOther()); + } + + @Override + public void describeTo(Description description) { + matcher.describeTo(description); + } + + static InnerTaskMatcher taskWith(Class clazz, Matcher matcher) { + return new InnerTaskMatcher<>(matcher); + } + } /** * Returns executor service that will be owned by the worker. This is useful to test the scenario * where worker shuts down the executor service also during shutdown flow. @@ -697,7 +1039,8 @@ public class WorkerTest { * @return Executor service that will be owned by the worker. */ private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() { - return new WorkerThreadPoolExecutor(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build(); + return new WorkerThreadPoolExecutor(threadFactory); } private List createShardListWithOneShard() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java index 2cb117ef..011e0721 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java @@ -1,18 +1,32 @@ package com.amazonaws.services.kinesis.clientlibrary.types; -import org.junit.Assert; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import org.junit.Test; /** * Unit tests of ShutdownReason enum class. */ public class ShutdownReasonTest { + @Test - public void testToString() { - Assert.assertEquals("ZOMBIE", String.valueOf(ShutdownReason.ZOMBIE)); - Assert.assertEquals("TERMINATE", String.valueOf(ShutdownReason.TERMINATE)); - Assert.assertEquals("ZOMBIE", ShutdownReason.ZOMBIE.toString()); - Assert.assertEquals("TERMINATE", ShutdownReason.TERMINATE.toString()); + public void testTransitionZombie() { + assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.TERMINATE), equalTo(false)); + assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false)); + } + + @Test + public void testTransitionTerminate() { + assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true)); + assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false)); + } + + @Test + public void testTransitionRequested() { + assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true)); + assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.TERMINATE), equalTo(true)); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java new file mode 100644 index 00000000..df39b9f2 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -0,0 +1,63 @@ +package com.amazonaws.services.kinesis.leases.impl; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + +public class KinesisClientLeaseBuilder { + private String leaseKey; + private String leaseOwner; + private Long leaseCounter = 0L; + private UUID concurrencyToken; + private Long lastCounterIncrementNanos; + private ExtendedSequenceNumber checkpoint; + private Long ownerSwitchesSinceCheckpoint = 0L; + private Set parentShardIds = new HashSet<>(); + + public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) { + this.leaseKey = leaseKey; + return this; + } + + public KinesisClientLeaseBuilder withLeaseOwner(String leaseOwner) { + this.leaseOwner = leaseOwner; + return this; + } + + public KinesisClientLeaseBuilder withLeaseCounter(Long leaseCounter) { + this.leaseCounter = leaseCounter; + return this; + } + + public KinesisClientLeaseBuilder withConcurrencyToken(UUID concurrencyToken) { + this.concurrencyToken = concurrencyToken; + return this; + } + + public KinesisClientLeaseBuilder withLastCounterIncrementNanos(Long lastCounterIncrementNanos) { + this.lastCounterIncrementNanos = lastCounterIncrementNanos; + return this; + } + + public KinesisClientLeaseBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) { + this.checkpoint = checkpoint; + return this; + } + + public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) { + this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; + return this; + } + + public KinesisClientLeaseBuilder withParentShardIds(Set parentShardIds) { + this.parentShardIds = parentShardIds; + return this; + } + + public KinesisClientLease build() { + return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, + checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + } +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 9461b1cc..749a1dc8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -28,7 +28,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 55eb5dce..580b8ad4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -33,7 +33,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 941b4582..a03b164d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -36,7 +36,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index ff7bc84e..9b90fe60 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -14,14 +14,13 @@ */ package com.amazonaws.services.kinesis.multilang.messages; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import org.junit.Assert; import org.junit.Test; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper;