+ * +-------------------+ + * | Waiting on Parent | +------------------+ + * +----+ Shard | | Shutdown | + * | | | +--------------------+ Notification | + * | +----------+--------+ | Shutdown: | Requested | + * | | Success | Requested +-+-------+--------+ + * | | | | | + * | +------+-------------+ | | | Shutdown: + * | | Initializing +-----+ | | Requested + * | | | | | | + * | | +-----+-------+ | | + * | +---------+----------+ | | Shutdown: | +-----+-------------+ + * | | Success | | Terminated | | Shutdown | + * | | | | Zombie | | Notification +-------------+ + * | +------+-------------+ | | | | Complete | | + * | | Processing +--+ | | ++-----------+------+ | + * | +---+ | | | | | | + * | | | +----------+ | | | Shutdown: | + * | | +------+-------------+ | \ / | Requested | + * | | | | \/ +--------------------+ + * | | | | || + * | | Success | | || Shutdown: + * | +----------+ | || Terminated + * | | || Zombie + * | | || + * | | || + * | | +---++--------------+ + * | | | Shutting Down | + * | +-----------+ | + * | | | + * | +--------+----------+ + * | | + * | | Shutdown: + * | | All Reasons + * | | + * | | + * | Shutdown: +--------+----------+ + * | All Reasons | Shutdown | + * +-------------------------------------------------------+ Complete | + * | | + * +-------------------+ + *+ */ +class ConsumerStates { + + /** + * Enumerates processing states when working on a shard. + */ + enum ShardConsumerState { + // @formatter:off + WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), + INITIALIZING(new InitializingState()), + PROCESSING(new ProcessingState()), + SHUTDOWN_REQUESTED(new ShutdownNotificationState()), + SHUTTING_DOWN(new ShuttingDownState()), + SHUTDOWN_COMPLETE(new ShutdownCompleteState()); + //@formatter:on + + private final ConsumerState consumerState; + + ShardConsumerState(ConsumerState consumerState) { + this.consumerState = consumerState; + } + + public ConsumerState getConsumerState() { + return consumerState; + } + } + + + /** + * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to + * do when a transition occurs. + * + */ + interface ConsumerState { + /** + * Creates a new task for this state using the passed in consumer to build the task. If there is no task + * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the + * consumer during the execution of this method. + * + * @param consumer + * the consumer to use build the task, or execute state. + * @return a valid task for this state or null if there is no task required. + */ + ITask createTask(ShardConsumer consumer); + + /** + * Provides the next state of the consumer upon success of the task return by + * {@link ConsumerState#createTask(ShardConsumer)}. + * + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState successTransition(); + + /** + * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent + * on the current state, and the shutdown reason. + * + * @param shutdownReason + * the reason that a shutdown was requested + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState shutdownTransition(ShutdownReason shutdownReason); + + /** + * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state + * even if createTask would return a null value. + * + * @return the type of task that this state represents. + */ + TaskType getTaskType(); + + /** + * An enumeration represent the type of this state. Different consumer states may return the same + * {@link ShardConsumerState}. + * + * @return the type of consumer state this represents. + */ + ShardConsumerState getState(); + + boolean isTerminal(); + + } + + /** + * The initial state that any {@link ShardConsumer} should start in. + */ + static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + private static ConsumerState shutdownStateFor(ShutdownReason reason) { + switch (reason) { + case REQUESTED: + return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + case TERMINATE: + case ZOMBIE: + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + default: + throw new IllegalArgumentException("Unknown reason: " + reason); + } + } + + /** + * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent + * shards have been completed. + * + *
+ * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never + * makes an requests to Kinesis for records, so it can't reach the end of a shard. + *
+ *+ * Transitions to the {@link ShuttingDownState} + *
+ *+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *
+ *+ * Remains in the {@link ShutdownNotificationCompletionState} + *
+ *+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *
+ *+ * Transitions to the {@link ShutdownCompleteState} + *
+ *+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *
+ *+ * Transitions to {@link ShutdownCompleteState} + *
+ *+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + *
+ *+ * Remains in the {@link ShutdownCompleteState} + *
+ *+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *
+ *+ * Remains in {@link ShutdownCompleteState} + *
+ *