From 33655bdedb7a09cec33dd950bd586bf20eea7c13 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Fri, 30 Sep 2016 08:01:36 -0700 Subject: [PATCH] Allow for a Graceful Shutdown of the Worker Add a new method to the worker requestShutdown that allows the worker to gracefully shutdown all record processors. The graceful shutdown gives the record processors a last chance to checkpoint before they're terminated. To use these new features the record processor must implement IShutdownnotificationaware. Some cleanup to try and make the state transitions of the record processor more clear. --- .../interfaces/IRecordProcessor.java | 2 +- .../v2/IShutdownNotificationAware.java | 19 + .../lib/worker/ConsumerStates.java | 602 ++++++++++++++++++ .../KinesisClientLibLeaseCoordinator.java | 35 +- .../MetricsCollectingTaskDecorator.java | 8 + .../worker/RecordProcessorCheckpointer.java | 2 +- .../lib/worker/ShardConsumer.java | 287 ++++----- .../ShardConsumerShutdownNotification.java | 67 ++ .../lib/worker/ShutdownFuture.java | 113 ++++ .../lib/worker/ShutdownNotification.java | 22 + .../lib/worker/ShutdownNotificationTask.java | 43 ++ .../{types => lib/worker}/ShutdownReason.java | 44 +- .../lib/worker/ShutdownTask.java | 7 +- .../clientlibrary/lib/worker/TaskType.java | 12 +- .../clientlibrary/lib/worker/Worker.java | 98 ++- .../clientlibrary/types/ShutdownInput.java | 1 + .../leases/impl/KinesisClientLease.java | 12 +- .../services/kinesis/leases/impl/Lease.java | 16 +- .../kinesis/leases/impl/LeaseCoordinator.java | 33 +- .../kinesis/leases/impl/LeaseRenewer.java | 9 + .../leases/interfaces/ILeaseRenewer.java | 7 + .../kinesis/multilang/MessageWriter.java | 2 +- .../kinesis/multilang/MultiLangProtocol.java | 2 +- .../multilang/MultiLangRecordProcessor.java | 2 +- .../multilang/messages/ShutdownMessage.java | 2 +- .../lib/worker/ConsumerStatesTest.java | 385 +++++++++++ .../lib/worker/ShardConsumerTest.java | 110 ++-- .../lib/worker/ShardSequenceVerifier.java | 1 - .../lib/worker/ShutdownFutureTest.java | 195 ++++++ .../lib/worker/ShutdownTaskTest.java | 7 +- .../lib/worker/TestStreamlet.java | 34 +- .../clientlibrary/lib/worker/WorkerTest.java | 379 ++++++++++- .../types/ShutdownReasonTest.java | 26 +- .../impl/KinesisClientLeaseBuilder.java | 63 ++ .../kinesis/multilang/MessageWriterTest.java | 2 +- .../multilang/MultiLangProtocolTest.java | 2 +- .../StreamingRecordProcessorTest.java | 2 +- .../multilang/messages/MessageTest.java | 3 +- 38 files changed, 2387 insertions(+), 269 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java rename src/main/java/com/amazonaws/services/kinesis/clientlibrary/{types => lib/worker}/ShutdownReason.java (52%) create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java index 481a5dbf..89cf092a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces; import java.util.List; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java new file mode 100644 index 00000000..82a18a0e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; + +/** + * Allows a record processor to indicate it's aware of requested shutdowns, and handle the request. + */ +public interface IShutdownNotificationAware { + + /** + * Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint. + * + * The record processor will still have shutdown called. + * + * @param checkpointer the checkpointer that can be used to save progress. + */ + void shutdownRequested(IRecordProcessorCheckpointer checkpointer); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java new file mode 100644 index 00000000..da8dce4e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -0,0 +1,602 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, + * and state transitions is contained within the {@link ConsumerState} objects. + * + *

State Diagram

+ * + *
+ *       +-------------------+
+ *       | Waiting on Parent |                               +------------------+
+ *  +----+       Shard       |                               |     Shutdown     |
+ *  |    |                   |          +--------------------+   Notification   |
+ *  |    +----------+--------+          |  Shutdown:         |     Requested    |
+ *  |               | Success           |   Requested        +-+-------+--------+
+ *  |               |                   |                      |       |
+ *  |        +------+-------------+     |                      |       | Shutdown:
+ *  |        |    Initializing    +-----+                      |       |  Requested
+ *  |        |                    |     |                      |       |
+ *  |        |                    +-----+-------+              |       |
+ *  |        +---------+----------+     |       | Shutdown:    | +-----+-------------+
+ *  |                  | Success        |       |  Terminated  | |     Shutdown      |
+ *  |                  |                |       |  Zombie      | |   Notification    +-------------+
+ *  |           +------+-------------+  |       |              | |     Complete      |             |
+ *  |           |     Processing     +--+       |              | ++-----------+------+             |
+ *  |       +---+                    |          |              |  |           |                    |
+ *  |       |   |                    +----------+              |  |           | Shutdown:          |
+ *  |       |   +------+-------------+          |              \  /           |  Requested         |
+ *  |       |          |                        |               \/            +--------------------+
+ *  |       |          |                        |               ||
+ *  |       | Success  |                        |               || Shutdown:
+ *  |       +----------+                        |               ||  Terminated
+ *  |                                           |               ||  Zombie
+ *  |                                           |               ||
+ *  |                                           |               ||
+ *  |                                           |           +---++--------------+
+ *  |                                           |           |   Shutting Down   |
+ *  |                                           +-----------+                   |
+ *  |                                                       |                   |
+ *  |                                                       +--------+----------+
+ *  |                                                                |
+ *  |                                                                | Shutdown:
+ *  |                                                                |  All Reasons
+ *  |                                                                |
+ *  |                                                                |
+ *  |      Shutdown:                                        +--------+----------+
+ *  |        All Reasons                                    |     Shutdown      |
+ *  +-------------------------------------------------------+     Complete      |
+ *                                                          |                   |
+ *                                                          +-------------------+
+ * 
+ */ +class ConsumerStates { + + /** + * Enumerates processing states when working on a shard. + */ + enum ShardConsumerState { + // @formatter:off + WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), + INITIALIZING(new InitializingState()), + PROCESSING(new ProcessingState()), + SHUTDOWN_REQUESTED(new ShutdownNotificationState()), + SHUTTING_DOWN(new ShuttingDownState()), + SHUTDOWN_COMPLETE(new ShutdownCompleteState()); + //@formatter:on + + private final ConsumerState consumerState; + + ShardConsumerState(ConsumerState consumerState) { + this.consumerState = consumerState; + } + + public ConsumerState getConsumerState() { + return consumerState; + } + } + + + /** + * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to + * do when a transition occurs. + * + */ + interface ConsumerState { + /** + * Creates a new task for this state using the passed in consumer to build the task. If there is no task + * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the + * consumer during the execution of this method. + * + * @param consumer + * the consumer to use build the task, or execute state. + * @return a valid task for this state or null if there is no task required. + */ + ITask createTask(ShardConsumer consumer); + + /** + * Provides the next state of the consumer upon success of the task return by + * {@link ConsumerState#createTask(ShardConsumer)}. + * + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState successTransition(); + + /** + * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent + * on the current state, and the shutdown reason. + * + * @param shutdownReason + * the reason that a shutdown was requested + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState shutdownTransition(ShutdownReason shutdownReason); + + /** + * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state + * even if createTask would return a null value. + * + * @return the type of task that this state represents. + */ + TaskType getTaskType(); + + /** + * An enumeration represent the type of this state. Different consumer states may return the same + * {@link ShardConsumerState}. + * + * @return the type of consumer state this represents. + */ + ShardConsumerState getState(); + + boolean isTerminal(); + + } + + /** + * The initial state that any {@link ShardConsumer} should start in. + */ + static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + private static ConsumerState shutdownStateFor(ShutdownReason reason) { + switch (reason) { + case REQUESTED: + return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + case TERMINATE: + case ZOMBIE: + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + default: + throw new IllegalArgumentException("Unknown reason: " + reason); + } + } + + /** + * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent + * shards have been completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Transition to the initializing state to allow the record processor to be initialized in preparation of + * processing.
+ *
Shutdown
+ *
+ *
+ *
All Reasons
+ *
Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be + * informed of the shutdown.
+ *
+ *
+ *
+ */ + static class BlockedOnParentState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), + consumer.getParentShardPollIntervalMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.INITIALIZING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.BLOCK_ON_PARENT_SHARDS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.WAITING_ON_PARENT_SHARDS; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for initializing the record processor with the shard information. + *

Valid Transitions

+ *
+ *
Success
+ *
Transitions to the processing state which will begin to send records to the record processor
+ *
Shutdown
+ *
At this point the record processor has been initialized, but hasn't processed any records. This requires that + * the record processor be notified of the shutdown, even though there is almost no actions the record processor + * could take. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
+ *

+ * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never + * makes an requests to Kinesis for records, so it can't reach the end of a shard. + *

+ *

+ * Transitions to the {@link ShuttingDownState} + *

+ *
+ *
+ *
+ *
+ */ + static class InitializingState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.INITIALIZE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.INITIALIZING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor. + * While in this state the only way a transition will occur is if a shutdown has been triggered. + *

Valid Transitions

+ *
+ *
Success
+ *
Doesn't actually transition, but instead returns the same state
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end + * of the shard triggering a {@link ShutdownReason#TERMINATE}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ProcessingState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.PROCESS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.PROCESSING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState(); + + /** + * This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a + * chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a + * shutdown on the {@link InitializingState} or {@link ProcessingState}. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. An explicit shutdown will allow the record + * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification()); + } + + @Override + public ConsumerState successTransition() { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason == ShutdownReason.REQUESTED) { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the + * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownNotificationCompletionState} + *

+ *
+ *
Shutdown
+ *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is + * waiting that notification. While waiting for the notification no further processing should occur on the + * {@link ShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationCompletionState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason != ShutdownReason.REQUESTED) { + return shutdownReason.getShutdownState(); + } + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Transitions to the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the record processor has processed the final shutdown indication, and depending on the shutdown + * reason taken the correct course of action. From this point on there should be no more interactions with the + * record processor or {@link ShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Transitions to {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShuttingDownState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), + consumer.getStreamConfig().getStreamProxy(), + consumer.getStreamConfig().getInitialPositionInStream(), + consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), + consumer.getTaskBackoffTimeMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTTING_DOWN; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any + * further actions. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Remains in {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShutdownCompleteState implements ConsumerState { + + @Override + public ITask createTask(ShardConsumer consumer) { + if (consumer.getShutdownNotification() != null) { + consumer.getShutdownNotification().shutdownComplete(); + } + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_COMPLETE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_COMPLETE; + } + + @Override + public boolean isTerminal() { + return true; + } + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 0119dcc3..59de31be 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -14,8 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -33,8 +34,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; @@ -200,23 +201,29 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getCurrentAssignments() { - List assignments = new LinkedList(); Collection leases = getAssignments(); - if ((leases != null) && (!leases.isEmpty())) { - for (KinesisClientLease lease : leases) { - Set parentShardIds = lease.getParentShardIds(); - ShardInfo assignment = - new ShardInfo( - lease.getLeaseKey(), - lease.getConcurrencyToken().toString(), - parentShardIds, - lease.getCheckpoint()); - assignments.add(assignment); - } + return convertLeasesToAssignments(leases); + + } + + public static List convertLeasesToAssignments(Collection leases) { + if (leases == null || leases.isEmpty()) { + return Collections.emptyList(); } + List assignments = new ArrayList<>(leases.size()); + for (KinesisClientLease lease : leases) { + assignments.add(convertLeaseToAssignment(lease)); + } + return assignments; } + public static ShardInfo convertLeaseToAssignment(KinesisClientLease lease) { + Set parentShardIds = lease.getParentShardIds(); + return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), parentShardIds, + lease.getCheckpoint()); + } + /** * Initialize the lease coordinator (create the lease table if needed). * @throws DependencyException diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java index 615467a0..e61da491 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java @@ -63,4 +63,12 @@ class MetricsCollectingTaskDecorator implements ITask { return other.getTaskType(); } + @Override + public String toString() { + return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")"; + } + + ITask getOther() { + return other; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index 16daafc2..69922670 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -69,7 +69,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { */ @Override public synchronized void checkpoint() - throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { + throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException { if (LOG.isDebugEnabled()) { LOG.debug("Checkpointing " + shardInfo.getShardId() + ", " + " token " + shardInfo.getConcurrencyToken() + " at largest permitted value " + this.largestPermittedCheckpointValue); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index b6cc76aa..f63ec59d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -14,6 +14,9 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -24,10 +27,10 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.google.common.annotations.VisibleForTesting; /** * Responsible for consuming data records of a (specified) shard. @@ -36,15 +39,12 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; */ class ShardConsumer { - /** - * Enumerates processing states when working on a shard. - */ - enum ShardConsumerState { - WAITING_ON_PARENT_SHARDS, INITIALIZING, PROCESSING, SHUTTING_DOWN, SHUTDOWN_COMPLETE; - } + private static final Log LOG = LogFactory.getLog(ShardConsumer.class); + private static final Set EMPTY_DISALLOWED_SET = Collections.emptySet(); + private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; @@ -67,13 +67,13 @@ class ShardConsumer { * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * much coordination/synchronization to handle concurrent reads/updates. */ - private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS; + private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE; /* * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. */ - private volatile boolean beginShutdown; private volatile ShutdownReason shutdownReason; + private volatile ShutdownNotification shutdownNotification; /** * @param shardInfo Shard information @@ -126,41 +126,19 @@ class ShardConsumer { return checkAndSubmitNextTask(); } - // CHECKSTYLE:OFF CyclomaticComplexity + private boolean readyForNextTask() { + return future == null || future.isCancelled() || future.isDone(); + } + private synchronized boolean checkAndSubmitNextTask() { - // Task completed successfully (without exceptions) - boolean taskCompletedSuccessfully = false; boolean submittedNewTask = false; - if ((future == null) || future.isCancelled() || future.isDone()) { - if ((future != null) && future.isDone()) { - try { - TaskResult result = future.get(); - if (result.getException() == null) { - taskCompletedSuccessfully = true; - if (result.isShardEndReached()) { - markForShutdown(ShutdownReason.TERMINATE); - } - } else { - if (LOG.isDebugEnabled()) { - Exception taskException = result.getException(); - if (taskException instanceof BlockedOnParentShardException) { - // No need to log the stack trace for this exception (it is very specific). - LOG.debug("Shard " + shardInfo.getShardId() - + " is blocked on completion of parent shard."); - } else { - LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", - result.getException()); - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - // Setting future to null so we don't misinterpret task completion status in case of exceptions - future = null; - } + if (readyForNextTask()) { + TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE; + if (future != null && future.isDone()) { + taskOutcome = determineTaskOutcome(); } - updateState(taskCompletedSuccessfully); + + updateState(taskOutcome); ITask nextTask = getNextTask(); if (nextTask != null) { currentTask = nextTask; @@ -193,7 +171,52 @@ class ShardConsumer { return submittedNewTask; } - // CHECKSTYLE:ON CyclomaticComplexity + private enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE + } + + private TaskOutcome determineTaskOutcome() { + try { + TaskResult result = future.get(); + if (result.getException() == null) { + if (result.isShardEndReached()) { + return TaskOutcome.END_OF_SHARD; + } + return TaskOutcome.SUCCESSFUL; + } + logTaskException(result); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // Setting future to null so we don't misinterpret task completion status in case of exceptions + future = null; + } + return TaskOutcome.FAILURE; + } + + private void logTaskException(TaskResult taskResult) { + if (LOG.isDebugEnabled()) { + Exception taskException = taskResult.getException(); + if (taskException instanceof BlockedOnParentShardException) { + // No need to log the stack trace for this exception (it is very specific). + LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard."); + } else { + LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", + taskResult.getException()); + } + } + } + + /** + * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint + * before being shutdown. + * + * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. + */ + void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + this.shutdownNotification = shutdownNotification; + markForShutdown(ShutdownReason.REQUESTED); + } /** * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). @@ -202,17 +225,15 @@ class ShardConsumer { * @return true if shutdown is complete (false if shutdown is still in progress) */ synchronized boolean beginShutdown() { - if (currentState != ShardConsumerState.SHUTDOWN_COMPLETE) { - markForShutdown(ShutdownReason.ZOMBIE); - checkAndSubmitNextTask(); - } + markForShutdown(ShutdownReason.ZOMBIE); + checkAndSubmitNextTask(); + return isShutdown(); } synchronized void markForShutdown(ShutdownReason reason) { - beginShutdown = true; // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) - if ((shutdownReason == null) || (shutdownReason == ShutdownReason.TERMINATE)) { + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { shutdownReason = reason; } } @@ -224,7 +245,7 @@ class ShardConsumer { * @return true if shutdown is complete */ boolean isShutdown() { - return currentState == ShardConsumerState.SHUTDOWN_COMPLETE; + return currentState.isTerminal(); } /** @@ -240,47 +261,7 @@ class ShardConsumer { * @return Return next task to run */ private ITask getNextTask() { - ITask nextTask = null; - switch (currentState) { - case WAITING_ON_PARENT_SHARDS: - nextTask = new BlockOnParentShardTask(shardInfo, leaseManager, parentShardPollIntervalMillis); - break; - case INITIALIZING: - nextTask = - new InitializeTask(shardInfo, - recordProcessor, - checkpoint, - recordProcessorCheckpointer, - dataFetcher, - taskBackoffTimeMillis, - streamConfig); - break; - case PROCESSING: - nextTask = - new ProcessTask(shardInfo, - streamConfig, - recordProcessor, - recordProcessorCheckpointer, - dataFetcher, - taskBackoffTimeMillis); - break; - case SHUTTING_DOWN: - nextTask = - new ShutdownTask(shardInfo, - recordProcessor, - recordProcessorCheckpointer, - shutdownReason, - streamConfig.getStreamProxy(), - streamConfig.getInitialPositionInStream(), - cleanupLeasesOfCompletedShards, - leaseManager, - taskBackoffTimeMillis); - break; - case SHUTDOWN_COMPLETE: - break; - default: - break; - } + ITask nextTask = currentState.createTask(this); if (nextTask == null) { return null; @@ -293,71 +274,93 @@ class ShardConsumer { * Note: This is a private/internal method with package level access solely for testing purposes. * Update state based on information about: task success, current state, and shutdown info. * - * @param taskCompletedSuccessfully Whether (current) task completed successfully. + * @param taskOutcome The outcome of the last task */ - // CHECKSTYLE:OFF CyclomaticComplexity - void updateState(boolean taskCompletedSuccessfully) { - if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) { - // Shutdown was completed and there nothing we can do after that - return; + void updateState(TaskOutcome taskOutcome) { + if (taskOutcome == TaskOutcome.END_OF_SHARD) { + markForShutdown(ShutdownReason.TERMINATE); } - if ((currentTask == null) && beginShutdown) { - // Shard didn't start any tasks and can be shutdown fast - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; - return; - } - if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) { - // Shard received signal to start shutdown. - // Whatever task we were working on should be stopped and shutdown task should be executed - currentState = ShardConsumerState.SHUTTING_DOWN; - return; - } - switch (currentState) { - case WAITING_ON_PARENT_SHARDS: - if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.INITIALIZING; - } - break; - case INITIALIZING: - if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.PROCESSING; - } - break; - case PROCESSING: - if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) { - currentState = ShardConsumerState.PROCESSING; - } - break; - case SHUTTING_DOWN: - if (currentTask == null - || (taskCompletedSuccessfully && TaskType.SHUTDOWN.equals(currentTask.getTaskType()))) { - currentState = ShardConsumerState.SHUTDOWN_COMPLETE; - } - break; - default: - LOG.error("Unexpected state: " + currentState); - break; + if (isShutdownRequested()) { + currentState = currentState.shutdownTransition(shutdownReason); + } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { + if (currentState.getTaskType() == currentTask.getTaskType()) { + currentState = currentState.successTransition(); + } else { + LOG.error("Current State task type of '" + currentState.getTaskType() + + "' doesn't match the current tasks type of '" + currentTask.getTaskType() + + "'. This shouldn't happen, and indicates a programming error. " + + "Unable to safely transition to the next state."); + } } + // + // Don't change state otherwise + // + } - // CHECKSTYLE:ON CyclomaticComplexity + @VisibleForTesting + boolean isShutdownRequested() { + return shutdownReason != null; + } /** * Private/Internal method - has package level access solely for testing purposes. * * @return the currentState */ - ShardConsumerState getCurrentState() { - return currentState; + ConsumerStates.ShardConsumerState getCurrentState() { + return currentState.getState(); } - /** - * Private/Internal method - has package level access solely for testing purposes. - * - * @return the beginShutdown - */ - boolean isBeginShutdown() { - return beginShutdown; + StreamConfig getStreamConfig() { + return streamConfig; } + IRecordProcessor getRecordProcessor() { + return recordProcessor; + } + + RecordProcessorCheckpointer getRecordProcessorCheckpointer() { + return recordProcessorCheckpointer; + } + + ExecutorService getExecutorService() { + return executorService; + } + + ShardInfo getShardInfo() { + return shardInfo; + } + + KinesisDataFetcher getDataFetcher() { + return dataFetcher; + } + + ILeaseManager getLeaseManager() { + return leaseManager; + } + + ICheckpoint getCheckpoint() { + return checkpoint; + } + + long getParentShardPollIntervalMillis() { + return parentShardPollIntervalMillis; + } + + boolean isCleanupLeasesOfCompletedShards() { + return cleanupLeasesOfCompletedShards; + } + + long getTaskBackoffTimeMillis() { + return taskBackoffTimeMillis; + } + + Future getFuture() { + return future; + } + + ShutdownNotification getShutdownNotification() { + return shutdownNotification; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java new file mode 100644 index 00000000..3ee21c04 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java @@ -0,0 +1,67 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.CountDownLatch; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; + +/** + * Contains callbacks for completion of stages in a requested record processor shutdown. + * + */ +class ShardConsumerShutdownNotification implements ShutdownNotification { + + private final LeaseCoordinator leaseCoordinator; + private final KinesisClientLease lease; + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + + private boolean notificationComplete = false; + private boolean allNotificationCompleted = false; + + /** + * Creates a new shutdown request object. + * + * @param leaseCoordinator + * the lease coordinator used to drop leases from once the initial shutdown request is completed. + * @param lease + * the lease that this shutdown request will free once initial shutdown is complete + * @param notificationCompleteLatch + * used to inform the caller once the + * {@link IShutdownNotificationAware} object has been + * notified of the shutdown request. + * @param shutdownCompleteLatch + * used to inform the caller once the record processor is fully shutdown + */ + ShardConsumerShutdownNotification(LeaseCoordinator leaseCoordinator, KinesisClientLease lease, + CountDownLatch notificationCompleteLatch, CountDownLatch shutdownCompleteLatch) { + this.leaseCoordinator = leaseCoordinator; + this.lease = lease; + this.notificationCompleteLatch = notificationCompleteLatch; + this.shutdownCompleteLatch = shutdownCompleteLatch; + } + + @Override + public void shutdownNotificationComplete() { + if (notificationComplete) { + return; + } + leaseCoordinator.dropLease(lease); + notificationCompleteLatch.countDown(); + notificationComplete = true; + } + + @Override + public void shutdownComplete() { + if (allNotificationCompleted) { + return; + } + if (!notificationComplete) { + notificationCompleteLatch.countDown(); + } + shutdownCompleteLatch.countDown(); + allNotificationCompleted = true; + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java new file mode 100644 index 00000000..21c004c1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java @@ -0,0 +1,113 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete. + */ +class ShutdownFuture implements Future { + + private static final Log log = LogFactory.getLog(ShutdownFuture.class); + + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + + private boolean workerShutdownCalled = false; + + ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { + this.shutdownCompleteLatch = shutdownCompleteLatch; + this.notificationCompleteLatch = notificationCompleteLatch; + this.worker = worker; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException("Cannot cancel a shutdown process"); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return isWorkerShutdownComplete(); + } + + private boolean isWorkerShutdownComplete() { + return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); + } + + private long outstandingRecordProcessors(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException { + // + // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // + if (!notificationCompleteLatch.await(timeout, unit)) { + long awaitingNotification = notificationCompleteLatch.getCount(); + log.info("Awaiting " + awaitingNotification + " record processors to complete initial shutdown"); + long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); + if (awaitingFinalShutdown != 0) { + return awaitingFinalShutdown; + } + } + // + // Once all record processors have been notified of the shutdown it is safe to allow the worker to + // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. + // + if (!workerShutdownCalled) { + // + // Unfortunately Worker#shutdown() doesn't appear to be idempotent. + // + worker.shutdown(); + } + // + // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown + // processing. This should really be a no-op since as part of the notification completion the lease for + // ShardConsumer is terminated. + // + if (!shutdownCompleteLatch.await(timeout, unit)) { + long outstanding = shutdownCompleteLatch.getCount(); + log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); + if (isWorkerShutdownComplete()) { + if (outstanding != 0) { + log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + shutdownCompleteLatch.getCount() + + ". shutdownComplete: " + worker.isShutdownComplete() + " -- Consumer Map: " + + worker.getShardInfoShardConsumerMap().size()); + } + return 0; + } + return outstanding; + } + return 0; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + long outstanding; + do { + outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); + log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); + } while(outstanding != 0); + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long outstanding = outstandingRecordProcessors(timeout, unit); + if (outstanding != 0) { + throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown."); + } + return null; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java new file mode 100644 index 00000000..928e6900 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; + +/** + * A shutdown request to the ShardConsumer + */ +public interface ShutdownNotification { + /** + * Used to indicate that the record processor has been notified of a requested shutdown, and given the chance to + * checkpoint. + * + */ + void shutdownNotificationComplete(); + + /** + * Used to indicate that the record processor has completed the call to + * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)} has + * completed. + */ + void shutdownComplete(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java new file mode 100644 index 00000000..13711f23 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -0,0 +1,43 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; + +/** + * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. + */ +class ShutdownNotificationTask implements ITask { + + private final IRecordProcessor recordProcessor; + private final IRecordProcessorCheckpointer recordProcessorCheckpointer; + private final ShutdownNotification shutdownNotification; + + ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) { + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; + this.shutdownNotification = shutdownNotification; + } + + @Override + public TaskResult call() { + try { + if (recordProcessor instanceof IShutdownNotificationAware) { + IShutdownNotificationAware shutdownNotificationAware = (IShutdownNotificationAware) recordProcessor; + try { + shutdownNotificationAware.shutdownRequested(recordProcessorCheckpointer); + } catch (Exception ex) { + return new TaskResult(ex); + } + } + return new TaskResult(null); + } finally { + shutdownNotification.shutdownNotificationComplete(); + } + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java similarity index 52% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java index 473b488c..8d0dfc80 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java @@ -12,7 +12,12 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.services.kinesis.clientlibrary.types; +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; + /** * Reason the RecordProcessor is being shutdown. @@ -28,7 +33,7 @@ public enum ShutdownReason { * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started * processing data). */ - ZOMBIE, + ZOMBIE(3, ShardConsumerState.SHUTTING_DOWN.getConsumerState()), /** * Terminate processing for this RecordProcessor (resharding use case). @@ -36,5 +41,38 @@ public enum ShutdownReason { * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records * from this shard and processing of child shards can be started. */ - TERMINATE + TERMINATE(2, ShardConsumerState.SHUTTING_DOWN.getConsumerState()), + + /** + * Indicates that the entire application is being shutdown, and if desired the record processor will be given a + * final chance to checkpoint. This state will not trigger a direct call to + * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but + * instead depend on a different interface for backward compatibility. + */ + REQUESTED(1, ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()); + + private final int rank; + private final ConsumerState shutdownState; + + ShutdownReason(int rank, ConsumerState shutdownState) { + this.rank = rank; + this.shutdownState = shutdownState; + } + + /** + * Indicates whether the given reason can override the current reason. + * + * @param reason the reason to transition to + * @return true if the transition is allowed, false if it's not. + */ + public boolean canTransitionTo(ShutdownReason reason) { + if (reason == null) { + return false; + } + return reason.rank > this.rank; + } + + ConsumerState getShutdownState() { + return shutdownState; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 3ce05203..d40fbb0e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.google.common.annotations.VisibleForTesting; /** * Task for invoking the RecordProcessor shutdown() callback. @@ -155,4 +155,9 @@ class ShutdownTask implements ITask { return taskType; } + @VisibleForTesting + ShutdownReason getReason() { + return reason; + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java index 426162d4..32fd1cd2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java @@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** * Enumerates types of tasks executed as part of processing a shard. */ -enum TaskType { +public enum TaskType { /** * Polls and waits until parent shard(s) have been fully processed. */ @@ -34,8 +34,16 @@ enum TaskType { * Shutdown of RecordProcessor. */ SHUTDOWN, + /** + * Graceful shutdown has been requested, and notification of the record processor will occur. + */ + SHUTDOWN_NOTIFICATION, + /** + * Occurs once the shutdown has been completed + */ + SHUTDOWN_COMPLETE, /** * Sync leases/activities corresponding to Kinesis shards. */ - SHARDSYNC; + SHARDSYNC } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 32efa442..76b6116f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,16 +14,21 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,14 +44,16 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Worker is the high level class that Kinesis applications use to start @@ -85,6 +92,7 @@ public class Worker implements Runnable { private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; + private volatile boolean shutdownComplete = false; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -482,9 +490,81 @@ public class Worker implements Runnable { } /** - * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that - * if executor services were passed to the worker by the user, worker will not attempt to shutdown - * those resources. + * Requests shutdown of the worker, notifying record processors, that implement + * {@link IShutdownNotificationAware}, of the impending shutdown. + * This gives the record processor a final chance to checkpoint. + * + *

Requested Shutdown Process

When a shutdown process is requested it operates slightly differently to + * allow the record processors a chance to checkpoint a final time. + *
    + *
  1. Call to request shutdown invoked.
  2. + *
  3. Worker stops attempting to acquire new leases
  4. + *
  5. Record Processor Shutdown Begins + *
      + *
    1. Record processor is notified of the impending shutdown, and given a final chance to checkpoint
    2. + *
    3. The lease for the record processor is then dropped.
    4. + *
    5. The record processor enters into an idle state waiting for the worker to complete final termination
    6. + *
    7. The worker will detect a record processor that has lost it's lease, and will terminate the record processor + * with {@link ShutdownReason#ZOMBIE}
    8. + *
    + *
  6. + *
  7. The worker will shutdown all record processors.
  8. + *
  9. Once all record processors have been terminated, the worker will terminate all owned resources.
  10. + *
  11. Once the worker shutdown is complete, the returned future is completed.
  12. + *
+ * + * + * + * @return a Future that will be set once the shutdown is complete. + */ + public Future requestShutdown() { + + leaseCoordinator.stopLeaseTaker(); + // + // Stop accepting new leases + // + Collection leases = leaseCoordinator.getAssignments(); + if (leases == null || leases.isEmpty()) { + // + // If there are no leases shutdown is already completed. + // + return Futures.immediateFuture(null); + } + CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size()); + CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size()); + for (KinesisClientLease lease : leases) { + ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, + notificationCompleteLatch, shutdownCompleteLatch); + ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); + shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification); + } + + return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); + + } + + boolean isShutdownComplete() { + return shutdownComplete; + } + + ConcurrentMap getShardInfoShardConsumerMap() { + return shardInfoShardConsumerMap; + } + + /** + * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor + * services were passed to the worker by the user, worker will not attempt to shutdown those resources. + * + *

Shutdown Process

When called this will start shutdown of the record processor, and eventually shutdown + * the worker itself. + *
    + *
  1. Call to start shutdown invoked
  2. + *
  3. Lease coordinator told to stop taking leases, and to drop existing leases.
  4. + *
  5. Worker discovers record processors that no longer have leases.
  6. + *
  7. Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.
  8. + *
  9. Once all record processors are shutdown, worker terminates owned resources.
  10. + *
  11. Shutdown complete.
  12. + *
*/ public void shutdown() { LOG.info("Worker shutdown requested."); @@ -513,6 +593,7 @@ public class Worker implements Runnable { if (metricsFactory instanceof WorkerCWMetricsFactory) { ((CWMetricsFactory) metricsFactory).shutdown(); } + shutdownComplete = true; } /** @@ -740,7 +821,8 @@ public class Worker implements Runnable { * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { - return new WorkerThreadPoolExecutor(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build(); + return new WorkerThreadPoolExecutor(threadFactory); } /** @@ -769,10 +851,10 @@ public class Worker implements Runnable { static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { private static final long DEFAULT_KEEP_ALIVE_TIME = 60L; - WorkerThreadPoolExecutor() { + WorkerThreadPoolExecutor(ThreadFactory threadFactory) { // Defaults are based on Executors.newCachedThreadPool() - super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new SynchronousQueue()); + super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), + threadFactory); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java index 9ea2c654..c533a4da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.types; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * Container for the parameters to the IRecordProcessor's diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java index b6c26d3d..b3a0ce6c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java @@ -15,9 +15,9 @@ package com.amazonaws.services.kinesis.leases.impl; import java.util.Collection; - import java.util.HashSet; import java.util.Set; +import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -41,6 +41,16 @@ public class KinesisClientLease extends Lease { this.parentShardIds.addAll(other.getParentShardIds()); } + KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, + Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint, + Set parentShardIds) { + super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos); + + this.checkpoint = checkpoint; + this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; + this.parentShardIds.addAll(parentShardIds); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java index 0638e4db..32234e35 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java @@ -63,11 +63,17 @@ public class Lease { * @param lease lease to copy */ protected Lease(Lease lease) { - this.leaseKey = lease.getLeaseKey(); - this.leaseOwner = lease.getLeaseOwner(); - this.leaseCounter = lease.getLeaseCounter(); - this.concurrencyToken = lease.getConcurrencyToken(); - this.lastCounterIncrementNanos = lease.getLastCounterIncrementNanos(); + this(lease.getLeaseKey(), lease.getLeaseOwner(), lease.getLeaseCounter(), lease.getConcurrencyToken(), + lease.getLastCounterIncrementNanos()); + } + + protected Lease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken, + Long lastCounterIncrementNanos) { + this.leaseKey = leaseKey; + this.leaseOwner = leaseOwner; + this.leaseCounter = leaseCounter; + this.concurrencyToken = concurrencyToken; + this.lastCounterIncrementNanos = lastCounterIncrementNanos; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index e356ac49..7629ad1d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -82,6 +83,7 @@ public class LeaseCoordinator { private ScheduledExecutorService leaseCoordinatorThreadPool; private final ExecutorService leaseRenewalThreadpool; private volatile boolean running = false; + private ScheduledFuture takerFuture; /** * Constructor. @@ -198,9 +200,15 @@ public class LeaseCoordinator { leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY); // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation. - leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS); + takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), + 0L, + takerIntervalMillis, + TimeUnit.MILLISECONDS); // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation. - leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS); + leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), + 0L, + renewerIntervalMillis, + TimeUnit.MILLISECONDS); running = true; } @@ -308,6 +316,27 @@ public class LeaseCoordinator { } } + /** + * Requests the cancellation of the lease taker. + */ + public void stopLeaseTaker() { + takerFuture.cancel(false); + + } + + /** + * Requests that renewals for the given lease are stopped. + * + * @param lease the lease to stop renewing. + */ + public void dropLease(T lease) { + synchronized (shutdownLock) { + if (lease != null) { + leaseRenewer.dropLease(lease); + } + } + } + /** * @return true if this LeaseCoordinator is running */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java index ae8040a5..b10ee1a3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java @@ -369,6 +369,15 @@ public class LeaseRenewer implements ILeaseRenewer { ownedLeases.clear(); } + /** + * {@inheritDoc} + * @param lease the lease to drop. + */ + @Override + public void dropLease(T lease) { + ownedLeases.remove(lease.getLeaseKey()); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java index 6c2a8527..87e9182a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java @@ -73,6 +73,13 @@ public interface ILeaseRenewer { */ public void clearCurrentlyHeldLeases(); + /** + * Stops the lease renewer from continunig to maintain the given lease. + * + * @param lease the lease to drop. + */ + void dropLease(T lease); + /** * Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as * leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java index d4ef9b20..2cda0632 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java @@ -26,7 +26,7 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java index 4b15a532..bce1793c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index a33e8ebf..babf6ac2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java index 524b481e..82ed5458 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java @@ -14,7 +14,7 @@ */ package com.amazonaws.services.kinesis.multilang.messages; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; /** * A message to indicate to the client's process that it should shutdown and then terminate. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java new file mode 100644 index 00000000..c0a778e9 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -0,0 +1,385 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.hamcrest.Condition; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerStatesTest { + + @Mock + private ShardConsumer consumer; + @Mock + private StreamConfig streamConfig; + @Mock + private IRecordProcessor recordProcessor; + @Mock + private RecordProcessorCheckpointer recordProcessorCheckpointer; + @Mock + private ExecutorService executorService; + @Mock + private ShardInfo shardInfo; + @Mock + private KinesisDataFetcher dataFetcher; + @Mock + private ILeaseManager leaseManager; + @Mock + private ICheckpoint checkpoint; + @Mock + private Future future; + @Mock + private ShutdownNotification shutdownNotification; + @Mock + private IKinesisProxy kinesisProxy; + @Mock + private InitialPositionInStreamExtended initialPositionInStream; + + private long parentShardPollIntervalMillis = 0xCAFE; + private boolean cleanupLeasesOfCompletedShards = true; + private long taskBackoffTimeMillis = 0xBEEF; + private ShutdownReason reason = ShutdownReason.TERMINATE; + + @Before + public void setup() { + when(consumer.getStreamConfig()).thenReturn(streamConfig); + when(consumer.getRecordProcessor()).thenReturn(recordProcessor); + when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer); + when(consumer.getExecutorService()).thenReturn(executorService); + when(consumer.getShardInfo()).thenReturn(shardInfo); + when(consumer.getDataFetcher()).thenReturn(dataFetcher); + when(consumer.getLeaseManager()).thenReturn(leaseManager); + when(consumer.getCheckpoint()).thenReturn(checkpoint); + when(consumer.getFuture()).thenReturn(future); + when(consumer.getShutdownNotification()).thenReturn(shutdownNotification); + when(consumer.getParentShardPollIntervalMillis()).thenReturn(parentShardPollIntervalMillis); + when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); + when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); + when(consumer.getShutdownReason()).thenReturn(reason); + + } + + private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; + + @Test + public void blockOnParentStateTest() { + ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + ITask task = state.createTask(consumer); + + assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, + taskWith(BlockOnParentShardTask.class, LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager))); + assertThat(task, taskWith(BlockOnParentShardTask.class, Long.class, "parentShardPollIntervalMillis", + equalTo(parentShardPollIntervalMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState())); + for (ShutdownReason shutdownReason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(shutdownReason), + equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)); + assertThat(state.getTaskType(), equalTo(TaskType.BLOCK_ON_PARENT_SHARDS)); + + } + + @Test + public void initializingStateTest() { + ConsumerState state = ShardConsumerState.INITIALIZING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, initTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, initTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, initTask(ICheckpoint.class, "checkpoint", equalTo(checkpoint))); + assertThat(task, initTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, initTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, initTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.INITIALIZING)); + assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE)); + } + + @Test + public void processingStateTest() { + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + + } + + @Test + public void shutdownRequestState() { + ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + + ITask task = state.createTask(consumer); + + assertThat(task, shutdownReqTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, shutdownReqTask(IRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer))); + assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification))); + + assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION)); + + } + + @Test + public void shutdownRequestCompleteStateTest() { + ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; + + assertThat(state.createTask(consumer), nullValue()); + + assertThat(state.successTransition(), equalTo(state)); + + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), equalTo(state)); + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION)); + + } + + @Test + public void shuttingDownStateTest() { + ConsumerState state = ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + + when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy); + when(streamConfig.getInitialPositionInStream()).thenReturn(initialPositionInStream); + + ITask task = state.createTask(consumer); + + assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, shutdownTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, shutdownTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); + assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy))); + assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager))); + assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", + equalTo(initialPositionInStream))); + assertThat(task, + shutdownTask(Boolean.class, "cleanupLeasesOfCompletedShards", equalTo(cleanupLeasesOfCompletedShards))); + assertThat(task, shutdownTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + + for (ShutdownReason reason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(reason), + equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTTING_DOWN)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN)); + + } + + @Test + public void shutdownCompleteStateTest() { + ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + + assertThat(state.createTask(consumer), nullValue()); + verify(consumer, times(2)).getShutdownNotification(); + verify(shutdownNotification).shutdownComplete(); + + assertThat(state.successTransition(), equalTo(state)); + for(ShutdownReason reason : ShutdownReason.values()) { + assertThat(state.shutdownTransition(reason), equalTo(state)); + } + + assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_COMPLETE)); + } + + @Test + public void shutdownCompleteStateNullNotificationTest() { + ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + + when(consumer.getShutdownNotification()).thenReturn(null); + assertThat(state.createTask(consumer), nullValue()); + + verify(consumer).getShutdownNotification(); + verify(shutdownNotification, never()).shutdownComplete(); + } + + static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher shutdownReqTask( + Class valueTypeClass, String propertyName, Matcher matcher) { + return taskWith(ShutdownNotificationTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher procTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher initTask(Class valueTypeClass, + String propertyName, Matcher matcher) { + return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); + } + + static ReflectionPropertyMatcher taskWith(Class taskTypeClass, + Class valueTypeClass, String propertyName, Matcher matcher) { + return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); + } + + private static class ReflectionPropertyMatcher extends TypeSafeDiagnosingMatcher { + + private final Class taskTypeClass; + private final Class valueTypeClazz; + private final Matcher matcher; + private final String propertyName; + private final Field matchingField; + + private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, + Matcher matcher, String propertyName) { + this.taskTypeClass = taskTypeClass; + this.valueTypeClazz = valueTypeClass; + this.matcher = matcher; + this.propertyName = propertyName; + + Field[] fields = taskTypeClass.getDeclaredFields(); + Field matching = null; + for (Field field : fields) { + if (propertyName.equals(field.getName())) { + matching = field; + } + } + this.matchingField = matching; + + } + + @Override + protected boolean matchesSafely(ITask item, Description mismatchDescription) { + + return Condition.matched(item, mismatchDescription).and(new Condition.Step() { + @Override + public Condition apply(ITask value, Description mismatch) { + if (taskTypeClass.equals(value.getClass())) { + return Condition.matched(taskTypeClass.cast(value), mismatch); + } + mismatch.appendText("Expected task type of ").appendText(taskTypeClass.getName()) + .appendText(" but was ").appendText(value.getClass().getName()); + return Condition.notMatched(); + } + }).and(new Condition.Step() { + @Override + public Condition apply(TaskType value, Description mismatch) { + if (matchingField == null) { + mismatch.appendText("Field ").appendText(propertyName).appendText(" not present in ") + .appendText(taskTypeClass.getName()); + return Condition.notMatched(); + } + + try { + return Condition.matched(getValue(value), mismatch); + } catch (RuntimeException re) { + mismatch.appendText("Failure while retrieving value for ").appendText(propertyName); + return Condition.notMatched(); + } + + } + }).and(new Condition.Step() { + @Override + public Condition apply(Object value, Description mismatch) { + if (value != null && !valueTypeClazz.isAssignableFrom(value.getClass())) { + mismatch.appendText("Expected a value of type ").appendText(valueTypeClazz.getName()) + .appendText(" but was ").appendText(value.getClass().getName()); + return Condition.notMatched(); + } + return Condition.matched(valueTypeClazz.cast(value), mismatch); + } + }).matching(matcher); + } + + @Override + public void describeTo(Description description) { + description + .appendText( + "A " + taskTypeClass.getName() + " task with the property " + propertyName + " matching ") + .appendDescriptionOf(matcher); + } + + private Object getValue(TaskType task) { + + matchingField.setAccessible(true); + try { + return matchingField.get(task); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to retrieve the value for " + matchingField.getName()); + } + } + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 26337381..347d44b4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -23,9 +23,9 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -46,17 +46,18 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.ShardConsumerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -68,6 +69,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests of {@link ShardConsumer}. */ +@RunWith(MockitoJUnitRunner.class) public class ShardConsumerTest { private static final Log LOG = LogFactory.getLog(ShardConsumerTest.class); @@ -86,6 +88,17 @@ public class ShardConsumerTest { // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); + @Mock + private IRecordProcessor processor; + @Mock + private IKinesisProxy streamProxy; + @Mock + private ILeaseManager leaseManager; + @Mock + private ICheckpoint checkpoint; + @Mock + private ShutdownNotification shutdownNotification; + /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -93,12 +106,9 @@ public class ShardConsumerTest { @Test public final void testInitializationStateUponFailure() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); + when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -119,19 +129,19 @@ public class ShardConsumerTest { metricsFactory, taskBackoffTimeMillis); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } @@ -142,13 +152,9 @@ public class ShardConsumerTest { @Test public final void testInitializationStateUponSubmissionFailure() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); ExecutorService spyExecutorService = spy(executorService); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -169,31 +175,27 @@ public class ShardConsumerTest { metricsFactory, taskBackoffTimeMillis); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); - ICheckpoint checkpoint = mock(ICheckpoint.class); - IRecordProcessor processor = mock(IRecordProcessor.class); - IKinesisProxy streamProxy = mock(IKinesisProxy.class); - ILeaseManager leaseManager = mock(ILeaseManager.class); StreamConfig streamConfig = new StreamConfig(streamProxy, 1, @@ -216,10 +218,10 @@ public class ShardConsumerTest { when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); // Throw Error when IRecordProcessor.initialize() is invoked. @@ -227,7 +229,7 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(any(InitializationInput.class)); try { @@ -238,24 +240,24 @@ public class ShardConsumerTest { assertThat(e.getCause(), instanceOf(ExecutionException.class)); } Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(any(InitializationInput.class)); doNothing().when(processor).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(2)).initialize(any(InitializationInput.class)); // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} + * Test method for {@link ShardConsumer#consumeShard()} */ @Test public final void testConsumeShard() throws Exception { @@ -276,8 +278,6 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); - @SuppressWarnings("unchecked") - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); TestStreamlet processor = new TestStreamlet(); @@ -302,20 +302,20 @@ public class ShardConsumerTest { metricsFactory, taskBackoffTimeMillis); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize - Thread.sleep(50L); + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -323,11 +323,26 @@ public class ShardConsumerTest { } assertThat(processor.getShutdownReason(), nullValue()); + consumer.notifyShutdownRequested(shutdownNotification); + consumer.consumeShard(); + assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true)); + Thread.sleep(50); + assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + verify(shutdownNotification).shutdownNotificationComplete(); + assertThat(processor.isShutdownNotificationCalled(), equalTo(true)); + consumer.consumeShard(); + Thread.sleep(50); + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE)); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); + consumer.consumeShard(); + verify(shutdownNotification, atLeastOnce()).shutdownComplete(); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); @@ -340,8 +355,7 @@ public class ShardConsumerTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} - * that starts from initial position of type AT_TIMESTAMP. + * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @Test public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { @@ -365,8 +379,6 @@ public class ShardConsumerTest { final int idleTimeMS = 0; // keep unit tests fast ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); - @SuppressWarnings("unchecked") - ILeaseManager leaseManager = mock(ILeaseManager.class); when(leaseManager.getLease(anyString())).thenReturn(null); TestStreamlet processor = new TestStreamlet(); @@ -392,11 +404,11 @@ public class ShardConsumerTest { metricsFactory, taskBackoffTimeMillis); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -405,7 +417,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -415,9 +427,9 @@ public class ShardConsumerTest { assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java index 5ad42359..314974b0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSequenceVerifier.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; /** * Helper class to verify shard lineage in unit tests that use TestStreamlet. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java new file mode 100644 index 00000000..124850c9 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java @@ -0,0 +1,195 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ShutdownFutureTest { + + @Mock + private CountDownLatch shutdownCompleteLatch; + @Mock + private CountDownLatch notificationCompleteLatch; + @Mock + private Worker worker; + @Mock + private ConcurrentMap shardInfoConsumerMap; + + @Test + public void testSimpleGetAlreadyCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + + mockNotificationComplete(true); + mockShutdownComplete(true); + + future.get(); + + verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(worker).shutdown(); + verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testNotificationNotCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + + mockNotificationComplete(false, true); + mockShutdownComplete(true); + + when(notificationCompleteLatch.getCount()).thenReturn(1L); + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + + expectedTimeoutException(future); + + verify(worker, never()).shutdown(); + + awaitFuture(future); + + verify(notificationCompleteLatch).getCount(); + verifyLatchAwait(notificationCompleteLatch, 2); + + verify(shutdownCompleteLatch).getCount(); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker).shutdown(); + + } + + @Test + public void testShutdownNotCompleted() throws Exception { + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + mockNotificationComplete(true); + mockShutdownComplete(false, true); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + when(worker.isShutdownComplete()).thenReturn(false); + + mockShardInfoConsumerMap(1); + + expectedTimeoutException(future); + verify(worker).shutdown(); + awaitFuture(future); + + verifyLatchAwait(notificationCompleteLatch, 2); + verifyLatchAwait(shutdownCompleteLatch, 2); + + verify(worker).isShutdownComplete(); + verify(worker).getShardInfoShardConsumerMap(); + + } + + @Test + public void testShutdownNotCompleteButWorkerShutdown() throws Exception { + ShutdownFuture future = create(); + + mockNotificationComplete(true); + mockShutdownComplete(false); + + when(shutdownCompleteLatch.getCount()).thenReturn(1L); + when(worker.isShutdownComplete()).thenReturn(true); + mockShardInfoConsumerMap(1); + + awaitFuture(future); + verify(worker).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker).getShardInfoShardConsumerMap(); + verify(shardInfoConsumerMap).size(); + } + + @Test + public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(true); + mockShutdownComplete(false); + + mockOutstanding(shutdownCompleteLatch, 1L); + + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(0); + + awaitFuture(future); + verify(worker).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verifyLatchAwait(shutdownCompleteLatch); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker, times(2)).getShardInfoShardConsumerMap(); + + verify(shardInfoConsumerMap).isEmpty(); + verify(shardInfoConsumerMap).size(); + + + } + + private ShutdownFuture create() { + return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + } + + private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) { + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length]; + for(int i = 0; i < additionalItemCounts.length; ++i) { + additionalEmptyStates[i] = additionalItemCounts[i] == 0; + } + when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts); + when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates); + } + + private void verifyLatchAwait(CountDownLatch latch) throws Exception { + verifyLatchAwait(latch, 1); + } + + private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception { + verify(latch, times(times)).await(anyLong(), any(TimeUnit.class)); + } + + private void expectedTimeoutException(ShutdownFuture future) throws Exception { + boolean gotTimeout = false; + try { + awaitFuture(future); + } catch (TimeoutException te) { + gotTimeout = true; + } + assertThat("Expected a timeout exception to occur", gotTimeout); + } + + private void awaitFuture(ShutdownFuture future) throws Exception { + future.get(1, TimeUnit.MILLISECONDS); + } + + private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception { + mockLatch(notificationCompleteLatch, initial, states); + + } + + private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception { + mockLatch(shutdownCompleteLatch, initial, states); + } + + private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception { + when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states); + } + + private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception { + when(latch.getCount()).thenReturn(remaining, additionalRemaining); + } + +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 67b42a0e..9eaf7e8e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -31,7 +31,6 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisC import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -82,7 +81,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. + * Test method for {@link ShutdownTask#call()}. */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -106,7 +105,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. + * Test method for {@link ShutdownTask#call()}. */ @Test public final void testCallWhenSyncingShardsThrows() { @@ -131,7 +130,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#getTaskType()}. + * Test method for {@link ShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java index d9391e8a..174410e7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TestStreamlet.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,12 +36,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; /** * Streamlet that tracks records it's seen - useful for testing. */ -class TestStreamlet implements IRecordProcessor { +class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware { private static final Log LOG = LogFactory.getLog(TestStreamlet.class); @@ -55,6 +56,11 @@ class TestStreamlet implements IRecordProcessor { private ShutdownReason shutdownReason; private ShardSequenceVerifier shardSequenceVerifier; private long numProcessRecordsCallsWithEmptyRecordList; + private boolean shutdownNotificationCalled; + + private final CountDownLatch initializeLatch = new CountDownLatch(1); + private final CountDownLatch notifyShutdownLatch = new CountDownLatch(1); + private final CountDownLatch shutdownLatch = new CountDownLatch(1); public TestStreamlet() { @@ -76,6 +82,7 @@ class TestStreamlet implements IRecordProcessor { if (shardSequenceVerifier != null) { shardSequenceVerifier.registerInitialization(shardId); } + initializeLatch.countDown(); } @Override @@ -125,6 +132,8 @@ class TestStreamlet implements IRecordProcessor { throw new RuntimeException(e); } } + + shutdownLatch.countDown(); } /** @@ -148,4 +157,25 @@ class TestStreamlet implements IRecordProcessor { return numProcessRecordsCallsWithEmptyRecordList; } + boolean isShutdownNotificationCalled() { + return shutdownNotificationCalled; + } + + @Override + public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { + shutdownNotificationCalled = true; + notifyShutdownLatch.countDown(); + } + + public CountDownLatch getInitializeLatch() { + return initializeLatch; + } + + public CountDownLatch getNotifyShutdownLatch() { + return notifyShutdownLatch; + } + + public CountDownLatch getShutdownLatch() { + return shutdownLatch; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 0747f83d..f05a58ff 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -14,21 +14,16 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.File; import java.lang.Thread.State; @@ -42,21 +37,29 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hamcrest.Condition; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; import org.junit.runner.RunWith; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; @@ -78,8 +81,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; @@ -90,6 +93,8 @@ import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.Shard; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Unit tests of Worker. @@ -99,8 +104,8 @@ public class WorkerTest { private static final Log LOG = LogFactory.getLog(WorkerTest.class); - @Rule - public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30)); + // @Rule + // public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30)); private final NullMetricsFactory nullMetricsFactory = new NullMetricsFactory(); private final long taskBackoffTimeMillis = 1L; @@ -140,6 +145,10 @@ public class WorkerTest { private IRecordProcessor v2RecordProcessor; @Mock private ShardConsumer shardConsumer; + @Mock + private Future taskFuture; + @Mock + private TaskResult taskResult; // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -345,10 +354,10 @@ public class WorkerTest { worker.cleanupShardConsumers(assignedShards); // verify shard consumer not present in assignedShards is shut down - Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isBeginShutdown()); + Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isShutdownRequested()); // verify shard consumers present in assignedShards aren't shut down - Assert.assertFalse(consumerOfShardInfo1.isBeginShutdown()); - Assert.assertFalse(consumerOfShardInfo2.isBeginShutdown()); + Assert.assertFalse(consumerOfShardInfo1.isShutdownRequested()); + Assert.assertFalse(consumerOfShardInfo2.isShutdownRequested()); } @Test @@ -687,6 +696,339 @@ public class WorkerTest { assertThat(recordProcessorInterrupted.get(), equalTo(true)); } + @Test + public void testRequestShutdown() throws Exception { + + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + worker.requestShutdown(); + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + worker.runProcessLoop(); + + verify(leaseCoordinator, atLeastOnce()).dropLease(eq(lease)); + leases.clear(); + currentAssignments.clear(); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + @Test + public void testLeaseCancelledAfterShutdownRequest() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + + worker.requestShutdown(); + leases.clear(); + currentAssignments.clear(); + worker.runProcessLoop(); + + verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()) + .submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.ZOMBIE)))); + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + @Test + public void testEndOfShardAfterShutdownRequest() throws Exception { + + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + StreamConfig streamConfig = mock(StreamConfig.class); + IMetricsFactory metricsFactory = mock(IMetricsFactory.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + + final List leases = new ArrayList<>(); + final List currentAssignments = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), + lease.getParentShardIds(), lease.getCheckpoint())); + + when(leaseCoordinator.getAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return leases; + } + }); + when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return currentAssignments; + } + }); + + IRecordProcessor processor = mock(IRecordProcessor.class); + when(recordProcessorFactory.createProcessor()).thenReturn(processor); + + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, + taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE)))); + when(taskResult.isShardEndReached()).thenReturn(true); + + worker.requestShutdown(); + worker.runProcessLoop(); + + verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) + .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION)))); + + verify(executorService).submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.TERMINATE)))); + + worker.runProcessLoop(); + + verify(executorService, atLeastOnce()).submit(argThat( + both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)))); + + } + + private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher { + + private final Matcher matcher; + + public ShutdownReasonMatcher(Matcher matcher) { + this.matcher = matcher; + } + + @Override + public void describeTo(Description description) { + description.appendText("hasShutdownReason(").appendDescriptionOf(matcher).appendText(")"); + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) { + return Condition.matched(item, mismatchDescription) + .and(new Condition.Step() { + @Override + public Condition apply(MetricsCollectingTaskDecorator value, + Description mismatch) { + if (!(value.getOther() instanceof ShutdownTask)) { + mismatch.appendText("Wrapped task isn't a shutdown task"); + return Condition.notMatched(); + } + return Condition.matched((ShutdownTask) value.getOther(), mismatch); + } + }).and(new Condition.Step() { + @Override + public Condition apply(ShutdownTask value, Description mismatch) { + return Condition.matched(value.getReason(), mismatch); + } + }).matching(matcher); + } + + public static ShutdownReasonMatcher hasReason(Matcher matcher) { + return new ShutdownReasonMatcher(matcher); + } + } + + private static class ShutdownHandlingAnswer implements Answer> { + + final Future defaultFuture; + + public ShutdownHandlingAnswer(Future defaultFuture) { + this.defaultFuture = defaultFuture; + } + + @Override + public Future answer(InvocationOnMock invocation) throws Throwable { + ITask rootTask = (ITask) invocation.getArguments()[0]; + if (rootTask instanceof MetricsCollectingTaskDecorator + && ((MetricsCollectingTaskDecorator) rootTask).getOther() instanceof ShutdownNotificationTask) { + ShutdownNotificationTask task = (ShutdownNotificationTask) ((MetricsCollectingTaskDecorator) rootTask).getOther(); + return Futures.immediateFuture(task.call()); + } + return defaultFuture; + } + } + + private static class TaskTypeMatcher extends TypeSafeMatcher { + + final Matcher expectedTaskType; + + TaskTypeMatcher(TaskType expectedTaskType) { + this(equalTo(expectedTaskType)); + } + + TaskTypeMatcher(Matcher expectedTaskType) { + this.expectedTaskType = expectedTaskType; + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item) { + return expectedTaskType.matches(item.getTaskType()); + } + + @Override + public void describeTo(Description description) { + description.appendText("taskType matches"); + expectedTaskType.describeTo(description); + } + + static TaskTypeMatcher isOfType(TaskType taskType) { + return new TaskTypeMatcher(taskType); + } + + static TaskTypeMatcher matchesType(Matcher matcher) { + return new TaskTypeMatcher(matcher); + } + } + + private static class InnerTaskMatcher extends TypeSafeMatcher { + + final Matcher matcher; + + InnerTaskMatcher(Matcher matcher) { + this.matcher = matcher; + } + + @Override + protected boolean matchesSafely(MetricsCollectingTaskDecorator item) { + return matcher.matches(item.getOther()); + } + + @Override + public void describeTo(Description description) { + matcher.describeTo(description); + } + + static InnerTaskMatcher taskWith(Class clazz, Matcher matcher) { + return new InnerTaskMatcher<>(matcher); + } + } /** * Returns executor service that will be owned by the worker. This is useful to test the scenario * where worker shuts down the executor service also during shutdown flow. @@ -694,7 +1036,8 @@ public class WorkerTest { * @return Executor service that will be owned by the worker. */ private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() { - return new WorkerThreadPoolExecutor(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build(); + return new WorkerThreadPoolExecutor(threadFactory); } private List createShardListWithOneShard() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java index 2cb117ef..011e0721 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReasonTest.java @@ -1,18 +1,32 @@ package com.amazonaws.services.kinesis.clientlibrary.types; -import org.junit.Assert; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import org.junit.Test; /** * Unit tests of ShutdownReason enum class. */ public class ShutdownReasonTest { + @Test - public void testToString() { - Assert.assertEquals("ZOMBIE", String.valueOf(ShutdownReason.ZOMBIE)); - Assert.assertEquals("TERMINATE", String.valueOf(ShutdownReason.TERMINATE)); - Assert.assertEquals("ZOMBIE", ShutdownReason.ZOMBIE.toString()); - Assert.assertEquals("TERMINATE", ShutdownReason.TERMINATE.toString()); + public void testTransitionZombie() { + assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.TERMINATE), equalTo(false)); + assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false)); + } + + @Test + public void testTransitionTerminate() { + assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true)); + assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false)); + } + + @Test + public void testTransitionRequested() { + assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true)); + assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.TERMINATE), equalTo(true)); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java new file mode 100644 index 00000000..df39b9f2 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLeaseBuilder.java @@ -0,0 +1,63 @@ +package com.amazonaws.services.kinesis.leases.impl; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + +public class KinesisClientLeaseBuilder { + private String leaseKey; + private String leaseOwner; + private Long leaseCounter = 0L; + private UUID concurrencyToken; + private Long lastCounterIncrementNanos; + private ExtendedSequenceNumber checkpoint; + private Long ownerSwitchesSinceCheckpoint = 0L; + private Set parentShardIds = new HashSet<>(); + + public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) { + this.leaseKey = leaseKey; + return this; + } + + public KinesisClientLeaseBuilder withLeaseOwner(String leaseOwner) { + this.leaseOwner = leaseOwner; + return this; + } + + public KinesisClientLeaseBuilder withLeaseCounter(Long leaseCounter) { + this.leaseCounter = leaseCounter; + return this; + } + + public KinesisClientLeaseBuilder withConcurrencyToken(UUID concurrencyToken) { + this.concurrencyToken = concurrencyToken; + return this; + } + + public KinesisClientLeaseBuilder withLastCounterIncrementNanos(Long lastCounterIncrementNanos) { + this.lastCounterIncrementNanos = lastCounterIncrementNanos; + return this; + } + + public KinesisClientLeaseBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) { + this.checkpoint = checkpoint; + return this; + } + + public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) { + this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint; + return this; + } + + public KinesisClientLeaseBuilder withParentShardIds(Set parentShardIds) { + this.parentShardIds = parentShardIds; + return this; + } + + public KinesisClientLease build() { + return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos, + checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds); + } +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java index 9461b1cc..749a1dc8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MessageWriterTest.java @@ -28,7 +28,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.Message; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java index 55eb5dce..580b8ad4 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocolTest.java @@ -33,7 +33,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.Message; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index 941b4582..a03b164d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -36,7 +36,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java index ff7bc84e..9b90fe60 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/messages/MessageTest.java @@ -14,14 +14,13 @@ */ package com.amazonaws.services.kinesis.multilang.messages; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import org.junit.Assert; import org.junit.Test; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper;