Allow for a Graceful Shutdown of the Worker

Add a new method to the worker requestShutdown that allows the worker to
gracefully shutdown all record processors.  The graceful shutdown gives
the record processors a last chance to checkpoint before they're
terminated.

To use these new features the record processor must implement
IShutdownnotificationaware.

Some cleanup to try and make the state transitions of the record
processor more clear.
This commit is contained in:
Pfifer, Justin 2016-09-30 08:01:36 -07:00
parent 51663f96c7
commit 33655bdedb
38 changed files with 2387 additions and 269 deletions

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces;
import java.util.List;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon

View file

@ -0,0 +1,19 @@
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
/**
* Allows a record processor to indicate it's aware of requested shutdowns, and handle the request.
*/
public interface IShutdownNotificationAware {
/**
* Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint.
*
* The record processor will still have shutdown called.
*
* @param checkpointer the checkpointer that can be used to save progress.
*/
void shutdownRequested(IRecordProcessorCheckpointer checkpointer);
}

View file

@ -0,0 +1,602 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects.
*
* <h2>State Diagram</h2>
*
* <pre>
* +-------------------+
* | Waiting on Parent | +------------------+
* +----+ Shard | | Shutdown |
* | | | +--------------------+ Notification |
* | +----------+--------+ | Shutdown: | Requested |
* | | Success | Requested +-+-------+--------+
* | | | | |
* | +------+-------------+ | | | Shutdown:
* | | Initializing +-----+ | | Requested
* | | | | | |
* | | +-----+-------+ | |
* | +---------+----------+ | | Shutdown: | +-----+-------------+
* | | Success | | Terminated | | Shutdown |
* | | | | Zombie | | Notification +-------------+
* | +------+-------------+ | | | | Complete | |
* | | Processing +--+ | | ++-----------+------+ |
* | +---+ | | | | | |
* | | | +----------+ | | | Shutdown: |
* | | +------+-------------+ | \ / | Requested |
* | | | | \/ +--------------------+
* | | | | ||
* | | Success | | || Shutdown:
* | +----------+ | || Terminated
* | | || Zombie
* | | ||
* | | ||
* | | +---++--------------+
* | | | Shutting Down |
* | +-----------+ |
* | | |
* | +--------+----------+
* | |
* | | Shutdown:
* | | All Reasons
* | |
* | |
* | Shutdown: +--------+----------+
* | All Reasons | Shutdown |
* +-------------------------------------------------------+ Complete |
* | |
* +-------------------+
* </pre>
*/
class ConsumerStates {
/**
* Enumerates processing states when working on a shard.
*/
enum ShardConsumerState {
// @formatter:off
WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()),
INITIALIZING(new InitializingState()),
PROCESSING(new ProcessingState()),
SHUTDOWN_REQUESTED(new ShutdownNotificationState()),
SHUTTING_DOWN(new ShuttingDownState()),
SHUTDOWN_COMPLETE(new ShutdownCompleteState());
//@formatter:on
private final ConsumerState consumerState;
ShardConsumerState(ConsumerState consumerState) {
this.consumerState = consumerState;
}
public ConsumerState getConsumerState() {
return consumerState;
}
}
/**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs.
*
*/
interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method.
*
* @param consumer
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
*/
ITask createTask(ShardConsumer consumer);
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}.
*
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState successTransition();
/**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason.
*
* @param shutdownReason
* the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState shutdownTransition(ShutdownReason shutdownReason);
/**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
*
* @return the type of task that this state represents.
*/
TaskType getTaskType();
/**
* An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}.
*
* @return the type of consumer state this represents.
*/
ShardConsumerState getState();
boolean isTerminal();
}
/**
* The initial state that any {@link ShardConsumer} should start in.
*/
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
private static ConsumerState shutdownStateFor(ShutdownReason reason) {
switch (reason) {
case REQUESTED:
return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
case TERMINATE:
case ZOMBIE:
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
default:
throw new IllegalArgumentException("Unknown reason: " + reason);
}
}
/**
* This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
* shards have been completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transition to the initializing state to allow the record processor to be initialized in preparation of
* processing.</dd>
* <dt>Shutdown</dt>
* <dd>
* <dl>
* <dt>All Reasons</dt>
* <dd>Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be
* informed of the shutdown.</dd>
* </dl>
* </dd>
* </dl>
*/
static class BlockedOnParentState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(),
consumer.getParentShardPollIntervalMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.INITIALIZING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.BLOCK_ON_PARENT_SHARDS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.WAITING_ON_PARENT_SHARDS;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for initializing the record processor with the shard information.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transitions to the processing state which will begin to send records to the record processor</dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has been initialized, but hasn't processed any records. This requires that
* the record processor be notified of the shutdown, even though there is almost no actions the record processor
* could take.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>
* <p>
* This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never
* makes an requests to Kinesis for records, so it can't reach the end of a shard.
* </p>
* <p>
* Transitions to the {@link ShuttingDownState}
* </p>
* </dd>
* </dl>
* </dd>
* </dl>
*/
static class InitializingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.INITIALIZE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.INITIALIZING;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor.
* While in this state the only way a transition will occur is if a shutdown has been triggered.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Doesn't actually transition, but instead returns the same state</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end
* of the shard triggering a {@link ShutdownReason#TERMINATE}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ProcessingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.PROCESS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.PROCESSING;
}
@Override
public boolean isTerminal() {
return false;
}
}
static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState();
/**
* This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a
* chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a
* shutdown on the {@link InitializingState} or {@link ProcessingState}.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. An explicit shutdown will allow the record
* processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification());
}
@Override
public ConsumerState successTransition() {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason == ShutdownReason.REQUESTED) {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the
* processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownNotificationCompletionState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is
* waiting that notification. While waiting for the notification no further processing should occur on the
* {@link ShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationCompletionState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason != ShutdownReason.REQUESTED) {
return shutdownReason.getShutdownState();
}
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Transitions to the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has processed the final shutdown indication, and depending on the shutdown
* reason taken the correct course of action. From this point on there should be no more interactions with the
* record processor or {@link ShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Transitions to {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShuttingDownState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTTING_DOWN;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any
* further actions.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Remains in {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownCompleteState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
if (consumer.getShutdownNotification() != null) {
consumer.getShutdownNotification().shutdownComplete();
}
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_COMPLETE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_COMPLETE;
}
@Override
public boolean isTerminal() {
return true;
}
}
}

View file

@ -14,8 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@ -33,8 +34,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
@ -200,23 +201,29 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @return Current shard/lease assignments
*/
public List<ShardInfo> getCurrentAssignments() {
List<ShardInfo> assignments = new LinkedList<ShardInfo>();
Collection<KinesisClientLease> leases = getAssignments();
if ((leases != null) && (!leases.isEmpty())) {
return convertLeasesToAssignments(leases);
}
public static List<ShardInfo> convertLeasesToAssignments(Collection<KinesisClientLease> leases) {
if (leases == null || leases.isEmpty()) {
return Collections.emptyList();
}
List<ShardInfo> assignments = new ArrayList<>(leases.size());
for (KinesisClientLease lease : leases) {
Set<String> parentShardIds = lease.getParentShardIds();
ShardInfo assignment =
new ShardInfo(
lease.getLeaseKey(),
lease.getConcurrencyToken().toString(),
parentShardIds,
lease.getCheckpoint());
assignments.add(assignment);
}
assignments.add(convertLeaseToAssignment(lease));
}
return assignments;
}
public static ShardInfo convertLeaseToAssignment(KinesisClientLease lease) {
Set<String> parentShardIds = lease.getParentShardIds();
return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), parentShardIds,
lease.getCheckpoint());
}
/**
* Initialize the lease coordinator (create the lease table if needed).
* @throws DependencyException

View file

@ -63,4 +63,12 @@ class MetricsCollectingTaskDecorator implements ITask {
return other.getTaskType();
}
@Override
public String toString() {
return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")";
}
ITask getOther() {
return other;
}
}

View file

@ -14,6 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@ -24,10 +27,10 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Responsible for consuming data records of a (specified) shard.
@ -36,15 +39,12 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
*/
class ShardConsumer {
/**
* Enumerates processing states when working on a shard.
*/
enum ShardConsumerState {
WAITING_ON_PARENT_SHARDS, INITIALIZING, PROCESSING, SHUTTING_DOWN, SHUTDOWN_COMPLETE;
}
private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
private static final Set<ConsumerStates.ShardConsumerState> EMPTY_DISALLOWED_SET = Collections.emptySet();
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
@ -67,13 +67,13 @@ class ShardConsumer {
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates.
*/
private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE;
/*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
*/
private volatile boolean beginShutdown;
private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;
/**
* @param shardInfo Shard information
@ -126,41 +126,19 @@ class ShardConsumer {
return checkAndSubmitNextTask();
}
// CHECKSTYLE:OFF CyclomaticComplexity
private boolean readyForNextTask() {
return future == null || future.isCancelled() || future.isDone();
}
private synchronized boolean checkAndSubmitNextTask() {
// Task completed successfully (without exceptions)
boolean taskCompletedSuccessfully = false;
boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
try {
TaskResult result = future.get();
if (result.getException() == null) {
taskCompletedSuccessfully = true;
if (result.isShardEndReached()) {
markForShutdown(ShutdownReason.TERMINATE);
if (readyForNextTask()) {
TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE;
if (future != null && future.isDone()) {
taskOutcome = determineTaskOutcome();
}
} else {
if (LOG.isDebugEnabled()) {
Exception taskException = result.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
LOG.debug("Shard " + shardInfo.getShardId()
+ " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
result.getException());
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null;
}
}
updateState(taskCompletedSuccessfully);
updateState(taskOutcome);
ITask nextTask = getNextTask();
if (nextTask != null) {
currentTask = nextTask;
@ -193,7 +171,52 @@ class ShardConsumer {
return submittedNewTask;
}
// CHECKSTYLE:ON CyclomaticComplexity
private enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
}
private TaskOutcome determineTaskOutcome() {
try {
TaskResult result = future.get();
if (result.getException() == null) {
if (result.isShardEndReached()) {
return TaskOutcome.END_OF_SHARD;
}
return TaskOutcome.SUCCESSFUL;
}
logTaskException(result);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null;
}
return TaskOutcome.FAILURE;
}
private void logTaskException(TaskResult taskResult) {
if (LOG.isDebugEnabled()) {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
taskResult.getException());
}
}
}
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
@ -202,17 +225,15 @@ class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
if (currentState != ShardConsumerState.SHUTDOWN_COMPLETE) {
markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask();
}
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
beginShutdown = true;
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
if ((shutdownReason == null) || (shutdownReason == ShutdownReason.TERMINATE)) {
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
@ -224,7 +245,7 @@ class ShardConsumer {
* @return true if shutdown is complete
*/
boolean isShutdown() {
return currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
return currentState.isTerminal();
}
/**
@ -240,47 +261,7 @@ class ShardConsumer {
* @return Return next task to run
*/
private ITask getNextTask() {
ITask nextTask = null;
switch (currentState) {
case WAITING_ON_PARENT_SHARDS:
nextTask = new BlockOnParentShardTask(shardInfo, leaseManager, parentShardPollIntervalMillis);
break;
case INITIALIZING:
nextTask =
new InitializeTask(shardInfo,
recordProcessor,
checkpoint,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis,
streamConfig);
break;
case PROCESSING:
nextTask =
new ProcessTask(shardInfo,
streamConfig,
recordProcessor,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis);
break;
case SHUTTING_DOWN:
nextTask =
new ShutdownTask(shardInfo,
recordProcessor,
recordProcessorCheckpointer,
shutdownReason,
streamConfig.getStreamProxy(),
streamConfig.getInitialPositionInStream(),
cleanupLeasesOfCompletedShards,
leaseManager,
taskBackoffTimeMillis);
break;
case SHUTDOWN_COMPLETE:
break;
default:
break;
}
ITask nextTask = currentState.createTask(this);
if (nextTask == null) {
return null;
@ -293,71 +274,93 @@ class ShardConsumer {
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
* @param taskCompletedSuccessfully Whether (current) task completed successfully.
* @param taskOutcome The outcome of the last task
*/
// CHECKSTYLE:OFF CyclomaticComplexity
void updateState(boolean taskCompletedSuccessfully) {
if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) {
// Shutdown was completed and there nothing we can do after that
return;
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
}
if ((currentTask == null) && beginShutdown) {
// Shard didn't start any tasks and can be shutdown fast
currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
return;
if (isShutdownRequested()) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition();
} else {
LOG.error("Current State task type of '" + currentState.getTaskType()
+ "' doesn't match the current tasks type of '" + currentTask.getTaskType()
+ "'. This shouldn't happen, and indicates a programming error. "
+ "Unable to safely transition to the next state.");
}
if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) {
// Shard received signal to start shutdown.
// Whatever task we were working on should be stopped and shutdown task should be executed
currentState = ShardConsumerState.SHUTTING_DOWN;
return;
}
switch (currentState) {
case WAITING_ON_PARENT_SHARDS:
if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.INITIALIZING;
}
break;
case INITIALIZING:
if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.PROCESSING;
}
break;
case PROCESSING:
if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.PROCESSING;
}
break;
case SHUTTING_DOWN:
if (currentTask == null
|| (taskCompletedSuccessfully && TaskType.SHUTDOWN.equals(currentTask.getTaskType()))) {
currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
}
break;
default:
LOG.error("Unexpected state: " + currentState);
break;
}
//
// Don't change state otherwise
//
}
// CHECKSTYLE:ON CyclomaticComplexity
@VisibleForTesting
boolean isShutdownRequested() {
return shutdownReason != null;
}
/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the currentState
*/
ShardConsumerState getCurrentState() {
return currentState;
ConsumerStates.ShardConsumerState getCurrentState() {
return currentState.getState();
}
/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the beginShutdown
*/
boolean isBeginShutdown() {
return beginShutdown;
StreamConfig getStreamConfig() {
return streamConfig;
}
IRecordProcessor getRecordProcessor() {
return recordProcessor;
}
RecordProcessorCheckpointer getRecordProcessorCheckpointer() {
return recordProcessorCheckpointer;
}
ExecutorService getExecutorService() {
return executorService;
}
ShardInfo getShardInfo() {
return shardInfo;
}
KinesisDataFetcher getDataFetcher() {
return dataFetcher;
}
ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
}
ICheckpoint getCheckpoint() {
return checkpoint;
}
long getParentShardPollIntervalMillis() {
return parentShardPollIntervalMillis;
}
boolean isCleanupLeasesOfCompletedShards() {
return cleanupLeasesOfCompletedShards;
}
long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
Future<TaskResult> getFuture() {
return future;
}
ShutdownNotification getShutdownNotification() {
return shutdownNotification;
}
}

View file

@ -0,0 +1,67 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
/**
* Contains callbacks for completion of stages in a requested record processor shutdown.
*
*/
class ShardConsumerShutdownNotification implements ShutdownNotification {
private final LeaseCoordinator<KinesisClientLease> leaseCoordinator;
private final KinesisClientLease lease;
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private boolean notificationComplete = false;
private boolean allNotificationCompleted = false;
/**
* Creates a new shutdown request object.
*
* @param leaseCoordinator
* the lease coordinator used to drop leases from once the initial shutdown request is completed.
* @param lease
* the lease that this shutdown request will free once initial shutdown is complete
* @param notificationCompleteLatch
* used to inform the caller once the
* {@link IShutdownNotificationAware} object has been
* notified of the shutdown request.
* @param shutdownCompleteLatch
* used to inform the caller once the record processor is fully shutdown
*/
ShardConsumerShutdownNotification(LeaseCoordinator<KinesisClientLease> leaseCoordinator, KinesisClientLease lease,
CountDownLatch notificationCompleteLatch, CountDownLatch shutdownCompleteLatch) {
this.leaseCoordinator = leaseCoordinator;
this.lease = lease;
this.notificationCompleteLatch = notificationCompleteLatch;
this.shutdownCompleteLatch = shutdownCompleteLatch;
}
@Override
public void shutdownNotificationComplete() {
if (notificationComplete) {
return;
}
leaseCoordinator.dropLease(lease);
notificationCompleteLatch.countDown();
notificationComplete = true;
}
@Override
public void shutdownComplete() {
if (allNotificationCompleted) {
return;
}
if (!notificationComplete) {
notificationCompleteLatch.countDown();
}
shutdownCompleteLatch.countDown();
allNotificationCompleted = true;
}
}

View file

@ -0,0 +1,113 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
*/
class ShutdownFuture implements Future<Void> {
private static final Log log = LogFactory.getLog(ShutdownFuture.class);
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
private boolean workerShutdownCalled = false;
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel a shutdown process");
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isWorkerShutdownComplete();
}
private boolean isWorkerShutdownComplete() {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
}
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete initial shutdown");
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
if (awaitingFinalShutdown != 0) {
return awaitingFinalShutdown;
}
}
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
if (!workerShutdownCalled) {
//
// Unfortunately Worker#shutdown() doesn't appear to be idempotent.
//
worker.shutdown();
}
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(timeout, unit)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount()
+ ". shutdownComplete: " + worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
}
return 0;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
long outstanding;
do {
outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} while(outstanding != 0);
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long outstanding = outstandingRecordProcessors(timeout, unit);
if (outstanding != 0) {
throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
}
return null;
}
}

View file

@ -0,0 +1,22 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
/**
* A shutdown request to the ShardConsumer
*/
public interface ShutdownNotification {
/**
* Used to indicate that the record processor has been notified of a requested shutdown, and given the chance to
* checkpoint.
*
*/
void shutdownNotificationComplete();
/**
* Used to indicate that the record processor has completed the call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)} has
* completed.
*/
void shutdownComplete();
}

View file

@ -0,0 +1,43 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
/**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
*/
class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;
}
@Override
public TaskResult call() {
try {
if (recordProcessor instanceof IShutdownNotificationAware) {
IShutdownNotificationAware shutdownNotificationAware = (IShutdownNotificationAware) recordProcessor;
try {
shutdownNotificationAware.shutdownRequested(recordProcessorCheckpointer);
} catch (Exception ex) {
return new TaskResult(ex);
}
}
return new TaskResult(null);
} finally {
shutdownNotification.shutdownNotificationComplete();
}
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
}

View file

@ -12,7 +12,12 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.types;
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
/**
* Reason the RecordProcessor is being shutdown.
@ -28,7 +33,7 @@ public enum ShutdownReason {
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
* processing data).
*/
ZOMBIE,
ZOMBIE(3, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
/**
* Terminate processing for this RecordProcessor (resharding use case).
@ -36,5 +41,38 @@ public enum ShutdownReason {
* Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records
* from this shard and processing of child shards can be started.
*/
TERMINATE
TERMINATE(2, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
/**
* Indicates that the entire application is being shutdown, and if desired the record processor will be given a
* final chance to checkpoint. This state will not trigger a direct call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but
* instead depend on a different interface for backward compatibility.
*/
REQUESTED(1, ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState());
private final int rank;
private final ConsumerState shutdownState;
ShutdownReason(int rank, ConsumerState shutdownState) {
this.rank = rank;
this.shutdownState = shutdownState;
}
/**
* Indicates whether the given reason can override the current reason.
*
* @param reason the reason to transition to
* @return true if the transition is allowed, false if it's not.
*/
public boolean canTransitionTo(ShutdownReason reason) {
if (reason == null) {
return false;
}
return reason.rank > this.rank;
}
ConsumerState getShutdownState() {
return shutdownState;
}
}

View file

@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
/**
* Task for invoking the RecordProcessor shutdown() callback.
@ -155,4 +155,9 @@ class ShutdownTask implements ITask {
return taskType;
}
@VisibleForTesting
ShutdownReason getReason() {
return reason;
}
}

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Enumerates types of tasks executed as part of processing a shard.
*/
enum TaskType {
public enum TaskType {
/**
* Polls and waits until parent shard(s) have been fully processed.
*/
@ -34,8 +34,16 @@ enum TaskType {
* Shutdown of RecordProcessor.
*/
SHUTDOWN,
/**
* Graceful shutdown has been requested, and notification of the record processor will occur.
*/
SHUTDOWN_NOTIFICATION,
/**
* Occurs once the shutdown has been completed
*/
SHUTDOWN_COMPLETE,
/**
* Sync leases/activities corresponding to Kinesis shards.
*/
SHARDSYNC;
SHARDSYNC
}

View file

@ -14,16 +14,21 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,14 +44,16 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Worker is the high level class that Kinesis applications use to start
@ -85,6 +92,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -482,9 +490,81 @@ public class Worker implements Runnable {
}
/**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that
* if executor services were passed to the worker by the user, worker will not attempt to shutdown
* those resources.
* Requests shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown.
* This gives the record processor a final chance to checkpoint.
*
* <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
* allow the record processors a chance to checkpoint a final time.
* <ol>
* <li>Call to request shutdown invoked.</li>
* <li>Worker stops attempting to acquire new leases</li>
* <li>Record Processor Shutdown Begins
* <ol>
* <li>Record processor is notified of the impending shutdown, and given a final chance to checkpoint</li>
* <li>The lease for the record processor is then dropped.</li>
* <li>The record processor enters into an idle state waiting for the worker to complete final termination</li>
* <li>The worker will detect a record processor that has lost it's lease, and will terminate the record processor
* with {@link ShutdownReason#ZOMBIE}</li>
* </ol>
* </li>
* <li>The worker will shutdown all record processors.</li>
* <li>Once all record processors have been terminated, the worker will terminate all owned resources.</li>
* <li>Once the worker shutdown is complete, the returned future is completed.</li>
* </ol>
*
*
*
* @return a Future that will be set once the shutdown is complete.
*/
public Future<Void> requestShutdown() {
leaseCoordinator.stopLeaseTaker();
//
// Stop accepting new leases
//
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases shutdown is already completed.
//
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
for (KinesisClientLease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
}
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
}
boolean isShutdownComplete() {
return shutdownComplete;
}
ConcurrentMap<ShardInfo, ShardConsumer> getShardInfoShardConsumerMap() {
return shardInfoShardConsumerMap;
}
/**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
*
* <h2>Shutdown Process</h2> When called this will start shutdown of the record processor, and eventually shutdown
* the worker itself.
* <ol>
* <li>Call to start shutdown invoked</li>
* <li>Lease coordinator told to stop taking leases, and to drop existing leases.</li>
* <li>Worker discovers record processors that no longer have leases.</li>
* <li>Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.</li>
* <li>Once all record processors are shutdown, worker terminates owned resources.</li>
* <li>Shutdown complete.</li>
* </ol>
*/
public void shutdown() {
LOG.info("Worker shutdown requested.");
@ -513,6 +593,7 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown();
}
shutdownComplete = true;
}
/**
@ -740,7 +821,8 @@ public class Worker implements Runnable {
* @return Default executor service that should be used by the worker.
*/
private static ExecutorService getExecutorService() {
return new WorkerThreadPoolExecutor();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
return new WorkerThreadPoolExecutor(threadFactory);
}
/**
@ -769,10 +851,10 @@ public class Worker implements Runnable {
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;
WorkerThreadPoolExecutor() {
WorkerThreadPoolExecutor(ThreadFactory threadFactory) {
// Defaults are based on Executors.newCachedThreadPool()
super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
threadFactory);
}
}

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.types;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* Container for the parameters to the IRecordProcessor's

View file

@ -15,9 +15,9 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -41,6 +41,16 @@ public class KinesisClientLease extends Lease {
this.parentShardIds.addAll(other.getParentShardIds());
}
KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint,
Set<String> parentShardIds) {
super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos);
this.checkpoint = checkpoint;
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
this.parentShardIds.addAll(parentShardIds);
}
/**
* {@inheritDoc}
*/

View file

@ -63,11 +63,17 @@ public class Lease {
* @param lease lease to copy
*/
protected Lease(Lease lease) {
this.leaseKey = lease.getLeaseKey();
this.leaseOwner = lease.getLeaseOwner();
this.leaseCounter = lease.getLeaseCounter();
this.concurrencyToken = lease.getConcurrencyToken();
this.lastCounterIncrementNanos = lease.getLastCounterIncrementNanos();
this(lease.getLeaseKey(), lease.getLeaseOwner(), lease.getLeaseCounter(), lease.getConcurrencyToken(),
lease.getLastCounterIncrementNanos());
}
protected Lease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos) {
this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter;
this.concurrencyToken = concurrencyToken;
this.lastCounterIncrementNanos = lastCounterIncrementNanos;
}
/**

View file

@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -82,6 +83,7 @@ public class LeaseCoordinator<T extends Lease> {
private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false;
private ScheduledFuture<?> takerFuture;
/**
* Constructor.
@ -198,9 +200,15 @@ public class LeaseCoordinator<T extends Lease> {
leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
// Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(),
0L,
takerIntervalMillis,
TimeUnit.MILLISECONDS);
// Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(),
0L,
renewerIntervalMillis,
TimeUnit.MILLISECONDS);
running = true;
}
@ -308,6 +316,27 @@ public class LeaseCoordinator<T extends Lease> {
}
}
/**
* Requests the cancellation of the lease taker.
*/
public void stopLeaseTaker() {
takerFuture.cancel(false);
}
/**
* Requests that renewals for the given lease are stopped.
*
* @param lease the lease to stop renewing.
*/
public void dropLease(T lease) {
synchronized (shutdownLock) {
if (lease != null) {
leaseRenewer.dropLease(lease);
}
}
}
/**
* @return true if this LeaseCoordinator is running
*/

View file

@ -369,6 +369,15 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
ownedLeases.clear();
}
/**
* {@inheritDoc}
* @param lease the lease to drop.
*/
@Override
public void dropLease(T lease) {
ownedLeases.remove(lease.getLeaseKey());
}
/**
* {@inheritDoc}
*/

View file

@ -73,6 +73,13 @@ public interface ILeaseRenewer<T extends Lease> {
*/
public void clearCurrentlyHeldLeases();
/**
* Stops the lease renewer from continunig to maintain the given lease.
*
* @param lease the lease to drop.
*/
void dropLease(T lease);
/**
* Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as
* leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match

View file

@ -26,7 +26,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;

View file

@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;

View file

@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -14,7 +14,7 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* A message to indicate to the client's process that it should shutdown and then terminate.

View file

@ -0,0 +1,385 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.hamcrest.Condition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
@Mock
private ShardConsumer consumer;
@Mock
private StreamConfig streamConfig;
@Mock
private IRecordProcessor recordProcessor;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock
private ExecutorService executorService;
@Mock
private ShardInfo shardInfo;
@Mock
private KinesisDataFetcher dataFetcher;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private ICheckpoint checkpoint;
@Mock
private Future<TaskResult> future;
@Mock
private ShutdownNotification shutdownNotification;
@Mock
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
private long taskBackoffTimeMillis = 0xBEEF;
private ShutdownReason reason = ShutdownReason.TERMINATE;
@Before
public void setup() {
when(consumer.getStreamConfig()).thenReturn(streamConfig);
when(consumer.getRecordProcessor()).thenReturn(recordProcessor);
when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer);
when(consumer.getExecutorService()).thenReturn(executorService);
when(consumer.getShardInfo()).thenReturn(shardInfo);
when(consumer.getDataFetcher()).thenReturn(dataFetcher);
when(consumer.getLeaseManager()).thenReturn(leaseManager);
when(consumer.getCheckpoint()).thenReturn(checkpoint);
when(consumer.getFuture()).thenReturn(future);
when(consumer.getShutdownNotification()).thenReturn(shutdownNotification);
when(consumer.getParentShardPollIntervalMillis()).thenReturn(parentShardPollIntervalMillis);
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason);
}
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@Test
public void blockOnParentStateTest() {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task,
taskWith(BlockOnParentShardTask.class, LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
assertThat(task, taskWith(BlockOnParentShardTask.class, Long.class, "parentShardPollIntervalMillis",
equalTo(parentShardPollIntervalMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState()));
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(shutdownReason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
}
assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));
assertThat(state.getTaskType(), equalTo(TaskType.BLOCK_ON_PARENT_SHARDS));
}
@Test
public void initializingStateTest() {
ConsumerState state = ShardConsumerState.INITIALIZING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, initTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, initTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, initTask(ICheckpoint.class, "checkpoint", equalTo(checkpoint)));
assertThat(task, initTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, initTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, initTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.INITIALIZING));
assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE));
}
@Test
public void processingStateTest() {
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, shutdownReqTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, shutdownReqTask(IRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer)));
assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification)));
assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
}
@Test
public void shutdownRequestCompleteStateTest() {
ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
assertThat(state.createTask(consumer), nullValue());
assertThat(state.successTransition(), equalTo(state));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), equalTo(state));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
}
@Test
public void shuttingDownStateTest() {
ConsumerState state = ShardConsumerState.SHUTTING_DOWN.getConsumerState();
when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
when(streamConfig.getInitialPositionInStream()).thenReturn(initialPositionInStream);
ITask task = state.createTask(consumer);
assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, shutdownTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, shutdownTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy)));
assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,
shutdownTask(Boolean.class, "cleanupLeasesOfCompletedShards", equalTo(cleanupLeasesOfCompletedShards)));
assertThat(task, shutdownTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
for (ShutdownReason reason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(reason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
}
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTTING_DOWN));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN));
}
@Test
public void shutdownCompleteStateTest() {
ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
assertThat(state.createTask(consumer), nullValue());
verify(consumer, times(2)).getShutdownNotification();
verify(shutdownNotification).shutdownComplete();
assertThat(state.successTransition(), equalTo(state));
for(ShutdownReason reason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(reason), equalTo(state));
}
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_COMPLETE));
}
@Test
public void shutdownCompleteStateNullNotificationTest() {
ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
when(consumer.getShutdownNotification()).thenReturn(null);
assertThat(state.createTask(consumer), nullValue());
verify(consumer).getShutdownNotification();
verify(shutdownNotification, never()).shutdownComplete();
}
static <ValueType> ReflectionPropertyMatcher<ShutdownTask, ValueType> shutdownTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<ShutdownNotificationTask, ValueType> shutdownReqTask(
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownNotificationTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<ProcessTask, ValueType> procTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<InitializeTask, ValueType> initTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher);
}
static <TaskType, ValueType> ReflectionPropertyMatcher<TaskType, ValueType> taskWith(Class<TaskType> taskTypeClass,
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName);
}
private static class ReflectionPropertyMatcher<TaskType, ValueType> extends TypeSafeDiagnosingMatcher<ITask> {
private final Class<TaskType> taskTypeClass;
private final Class<ValueType> valueTypeClazz;
private final Matcher<ValueType> matcher;
private final String propertyName;
private final Field matchingField;
private ReflectionPropertyMatcher(Class<TaskType> taskTypeClass, Class<ValueType> valueTypeClass,
Matcher<ValueType> matcher, String propertyName) {
this.taskTypeClass = taskTypeClass;
this.valueTypeClazz = valueTypeClass;
this.matcher = matcher;
this.propertyName = propertyName;
Field[] fields = taskTypeClass.getDeclaredFields();
Field matching = null;
for (Field field : fields) {
if (propertyName.equals(field.getName())) {
matching = field;
}
}
this.matchingField = matching;
}
@Override
protected boolean matchesSafely(ITask item, Description mismatchDescription) {
return Condition.matched(item, mismatchDescription).and(new Condition.Step<ITask, TaskType>() {
@Override
public Condition<TaskType> apply(ITask value, Description mismatch) {
if (taskTypeClass.equals(value.getClass())) {
return Condition.matched(taskTypeClass.cast(value), mismatch);
}
mismatch.appendText("Expected task type of ").appendText(taskTypeClass.getName())
.appendText(" but was ").appendText(value.getClass().getName());
return Condition.notMatched();
}
}).and(new Condition.Step<TaskType, Object>() {
@Override
public Condition<Object> apply(TaskType value, Description mismatch) {
if (matchingField == null) {
mismatch.appendText("Field ").appendText(propertyName).appendText(" not present in ")
.appendText(taskTypeClass.getName());
return Condition.notMatched();
}
try {
return Condition.matched(getValue(value), mismatch);
} catch (RuntimeException re) {
mismatch.appendText("Failure while retrieving value for ").appendText(propertyName);
return Condition.notMatched();
}
}
}).and(new Condition.Step<Object, ValueType>() {
@Override
public Condition<ValueType> apply(Object value, Description mismatch) {
if (value != null && !valueTypeClazz.isAssignableFrom(value.getClass())) {
mismatch.appendText("Expected a value of type ").appendText(valueTypeClazz.getName())
.appendText(" but was ").appendText(value.getClass().getName());
return Condition.notMatched();
}
return Condition.matched(valueTypeClazz.cast(value), mismatch);
}
}).matching(matcher);
}
@Override
public void describeTo(Description description) {
description
.appendText(
"A " + taskTypeClass.getName() + " task with the property " + propertyName + " matching ")
.appendDescriptionOf(matcher);
}
private Object getValue(TaskType task) {
matchingField.setAccessible(true);
try {
return matchingField.get(task);
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to retrieve the value for " + matchingField.getName());
}
}
}
}

View file

@ -23,9 +23,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -46,17 +46,18 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.ShardConsumerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -68,6 +69,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* Unit tests of {@link ShardConsumer}.
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardConsumerTest {
private static final Log LOG = LogFactory.getLog(ShardConsumerTest.class);
@ -86,6 +88,17 @@ public class ShardConsumerTest {
// ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
@Mock
private IRecordProcessor processor;
@Mock
private IKinesisProxy streamProxy;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
/**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/
@ -93,12 +106,9 @@ public class ShardConsumerTest {
@Test
public final void testInitializationStateUponFailure() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
@ -119,19 +129,19 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
}
@ -142,13 +152,9 @@ public class ShardConsumerTest {
@Test
public final void testInitializationStateUponSubmissionFailure() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
ExecutorService spyExecutorService = spy(executorService);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
@ -169,31 +175,27 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
}
@SuppressWarnings("unchecked")
@Test
public final void testRecordProcessorThrowable() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
@ -216,10 +218,10 @@ public class ShardConsumerTest {
when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123"));
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(processor, times(0)).initialize(any(InitializationInput.class));
// Throw Error when IRecordProcessor.initialize() is invoked.
@ -227,7 +229,7 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class));
try {
@ -238,24 +240,24 @@ public class ShardConsumerTest {
assertThat(e.getCause(), instanceOf(ExecutionException.class));
}
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class));
doNothing().when(processor).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask again.
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(2)).initialize(any(InitializationInput.class));
// Checking the status of submitted InitializeTask from above should pass.
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()}
* Test method for {@link ShardConsumer#consumeShard()}
*/
@Test
public final void testConsumeShard() throws Exception {
@ -276,8 +278,6 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet();
@ -302,20 +302,20 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
// We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -323,11 +323,26 @@ public class ShardConsumerTest {
}
assertThat(processor.getShutdownReason(), nullValue());
consumer.notifyShutdownRequested(shutdownNotification);
consumer.consumeShard();
assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true));
Thread.sleep(50);
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
verify(shutdownNotification).shutdownNotificationComplete();
assertThat(processor.isShutdownNotificationCalled(), equalTo(true));
consumer.consumeShard();
Thread.sleep(50);
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
consumer.beginShutdown();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)));
consumer.consumeShard();
verify(shutdownNotification, atLeastOnce()).shutdownComplete();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown();
@ -340,8 +355,7 @@ public class ShardConsumerTest {
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()}
* that starts from initial position of type AT_TIMESTAMP.
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
*/
@Test
public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception {
@ -365,8 +379,6 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet();
@ -392,11 +404,11 @@ public class ShardConsumerTest {
metricsFactory,
taskBackoffTimeMillis);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
@ -405,7 +417,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -415,9 +427,9 @@ public class ShardConsumerTest {
assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown();

View file

@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
/**
* Helper class to verify shard lineage in unit tests that use TestStreamlet.

View file

@ -0,0 +1,195 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ShutdownFutureTest {
@Mock
private CountDownLatch shutdownCompleteLatch;
@Mock
private CountDownLatch notificationCompleteLatch;
@Mock
private Worker worker;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Test
public void testSimpleGetAlreadyCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(true);
future.get();
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(worker).shutdown();
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
}
@Test
public void testNotificationNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(false, true);
mockShutdownComplete(true);
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
expectedTimeoutException(future);
verify(worker, never()).shutdown();
awaitFuture(future);
verify(notificationCompleteLatch).getCount();
verifyLatchAwait(notificationCompleteLatch, 2);
verify(shutdownCompleteLatch).getCount();
verifyLatchAwait(shutdownCompleteLatch);
verify(worker).shutdown();
}
@Test
public void testShutdownNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(false, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
expectedTimeoutException(future);
verify(worker).shutdown();
awaitFuture(future);
verifyLatchAwait(notificationCompleteLatch, 2);
verifyLatchAwait(shutdownCompleteLatch, 2);
verify(worker).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
}
@Test
public void testShutdownNotCompleteButWorkerShutdown() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(true);
mockShardInfoConsumerMap(1);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).size();
}
@Test
public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
private ShutdownFuture create() {
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
}
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) {
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
for(int i = 0; i < additionalItemCounts.length; ++i) {
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
}
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
}
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
verifyLatchAwait(latch, 1);
}
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
verify(latch, times(times)).await(anyLong(), any(TimeUnit.class));
}
private void expectedTimeoutException(ShutdownFuture future) throws Exception {
boolean gotTimeout = false;
try {
awaitFuture(future);
} catch (TimeoutException te) {
gotTimeout = true;
}
assertThat("Expected a timeout exception to occur", gotTimeout);
}
private void awaitFuture(ShutdownFuture future) throws Exception {
future.get(1, TimeUnit.MILLISECONDS);
}
private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(notificationCompleteLatch, initial, states);
}
private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(shutdownCompleteLatch, initial, states);
}
private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception {
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states);
}
private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception {
when(latch.getCount()).thenReturn(remaining, additionalRemaining);
}
}

View file

@ -31,7 +31,6 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisC
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -82,7 +81,7 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}.
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
@ -106,7 +105,7 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}.
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenSyncingShardsThrows() {
@ -131,7 +130,7 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#getTaskType()}.
* Test method for {@link ShutdownTask#getTaskType()}.
*/
@Test
public final void testGetTaskType() {

View file

@ -18,8 +18,10 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,12 +36,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
/**
* Streamlet that tracks records it's seen - useful for testing.
*/
class TestStreamlet implements IRecordProcessor {
class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware {
private static final Log LOG = LogFactory.getLog(TestStreamlet.class);
@ -55,6 +56,11 @@ class TestStreamlet implements IRecordProcessor {
private ShutdownReason shutdownReason;
private ShardSequenceVerifier shardSequenceVerifier;
private long numProcessRecordsCallsWithEmptyRecordList;
private boolean shutdownNotificationCalled;
private final CountDownLatch initializeLatch = new CountDownLatch(1);
private final CountDownLatch notifyShutdownLatch = new CountDownLatch(1);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
public TestStreamlet() {
@ -76,6 +82,7 @@ class TestStreamlet implements IRecordProcessor {
if (shardSequenceVerifier != null) {
shardSequenceVerifier.registerInitialization(shardId);
}
initializeLatch.countDown();
}
@Override
@ -125,6 +132,8 @@ class TestStreamlet implements IRecordProcessor {
throw new RuntimeException(e);
}
}
shutdownLatch.countDown();
}
/**
@ -148,4 +157,25 @@ class TestStreamlet implements IRecordProcessor {
return numProcessRecordsCallsWithEmptyRecordList;
}
boolean isShutdownNotificationCalled() {
return shutdownNotificationCalled;
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}
public CountDownLatch getInitializeLatch() {
return initializeLatch;
}
public CountDownLatch getNotifyShutdownLatch() {
return notifyShutdownLatch;
}
public CountDownLatch getShutdownLatch() {
return shutdownLatch;
}
}

View file

@ -14,21 +14,16 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.io.File;
import java.lang.Thread.State;
@ -42,21 +37,29 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
@ -78,8 +81,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -90,6 +93,8 @@ import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Unit tests of Worker.
@ -99,8 +104,8 @@ public class WorkerTest {
private static final Log LOG = LogFactory.getLog(WorkerTest.class);
@Rule
public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30));
// @Rule
// public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30));
private final NullMetricsFactory nullMetricsFactory = new NullMetricsFactory();
private final long taskBackoffTimeMillis = 1L;
@ -140,6 +145,10 @@ public class WorkerTest {
private IRecordProcessor v2RecordProcessor;
@Mock
private ShardConsumer shardConsumer;
@Mock
private Future<TaskResult> taskFuture;
@Mock
private TaskResult taskResult;
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
@ -345,10 +354,10 @@ public class WorkerTest {
worker.cleanupShardConsumers(assignedShards);
// verify shard consumer not present in assignedShards is shut down
Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isBeginShutdown());
Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isShutdownRequested());
// verify shard consumers present in assignedShards aren't shut down
Assert.assertFalse(consumerOfShardInfo1.isBeginShutdown());
Assert.assertFalse(consumerOfShardInfo2.isBeginShutdown());
Assert.assertFalse(consumerOfShardInfo1.isShutdownRequested());
Assert.assertFalse(consumerOfShardInfo2.isShutdownRequested());
}
@Test
@ -687,6 +696,339 @@ public class WorkerTest {
assertThat(recordProcessorInterrupted.get(), equalTo(true));
}
@Test
public void testRequestShutdown() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown();
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
worker.runProcessLoop();
verify(leaseCoordinator, atLeastOnce()).dropLease(eq(lease));
leases.clear();
currentAssignments.clear();
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
@Test
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown();
leases.clear();
currentAssignments.clear();
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
worker.runProcessLoop();
verify(executorService, atLeastOnce())
.submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.ZOMBIE))));
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
@Test
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
when(taskResult.isShardEndReached()).thenReturn(true);
worker.requestShutdown();
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
verify(executorService).submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.TERMINATE))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher<MetricsCollectingTaskDecorator> {
private final Matcher<ShutdownReason> matcher;
public ShutdownReasonMatcher(Matcher<ShutdownReason> matcher) {
this.matcher = matcher;
}
@Override
public void describeTo(Description description) {
description.appendText("hasShutdownReason(").appendDescriptionOf(matcher).appendText(")");
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) {
return Condition.matched(item, mismatchDescription)
.and(new Condition.Step<MetricsCollectingTaskDecorator, ShutdownTask>() {
@Override
public Condition<ShutdownTask> apply(MetricsCollectingTaskDecorator value,
Description mismatch) {
if (!(value.getOther() instanceof ShutdownTask)) {
mismatch.appendText("Wrapped task isn't a shutdown task");
return Condition.notMatched();
}
return Condition.matched((ShutdownTask) value.getOther(), mismatch);
}
}).and(new Condition.Step<ShutdownTask, ShutdownReason>() {
@Override
public Condition<ShutdownReason> apply(ShutdownTask value, Description mismatch) {
return Condition.matched(value.getReason(), mismatch);
}
}).matching(matcher);
}
public static ShutdownReasonMatcher hasReason(Matcher<ShutdownReason> matcher) {
return new ShutdownReasonMatcher(matcher);
}
}
private static class ShutdownHandlingAnswer implements Answer<Future<TaskResult>> {
final Future<TaskResult> defaultFuture;
public ShutdownHandlingAnswer(Future<TaskResult> defaultFuture) {
this.defaultFuture = defaultFuture;
}
@Override
public Future<TaskResult> answer(InvocationOnMock invocation) throws Throwable {
ITask rootTask = (ITask) invocation.getArguments()[0];
if (rootTask instanceof MetricsCollectingTaskDecorator
&& ((MetricsCollectingTaskDecorator) rootTask).getOther() instanceof ShutdownNotificationTask) {
ShutdownNotificationTask task = (ShutdownNotificationTask) ((MetricsCollectingTaskDecorator) rootTask).getOther();
return Futures.immediateFuture(task.call());
}
return defaultFuture;
}
}
private static class TaskTypeMatcher extends TypeSafeMatcher<MetricsCollectingTaskDecorator> {
final Matcher<TaskType> expectedTaskType;
TaskTypeMatcher(TaskType expectedTaskType) {
this(equalTo(expectedTaskType));
}
TaskTypeMatcher(Matcher<TaskType> expectedTaskType) {
this.expectedTaskType = expectedTaskType;
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item) {
return expectedTaskType.matches(item.getTaskType());
}
@Override
public void describeTo(Description description) {
description.appendText("taskType matches");
expectedTaskType.describeTo(description);
}
static TaskTypeMatcher isOfType(TaskType taskType) {
return new TaskTypeMatcher(taskType);
}
static TaskTypeMatcher matchesType(Matcher<TaskType> matcher) {
return new TaskTypeMatcher(matcher);
}
}
private static class InnerTaskMatcher<T extends ITask> extends TypeSafeMatcher<MetricsCollectingTaskDecorator> {
final Matcher<T> matcher;
InnerTaskMatcher(Matcher<T> matcher) {
this.matcher = matcher;
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item) {
return matcher.matches(item.getOther());
}
@Override
public void describeTo(Description description) {
matcher.describeTo(description);
}
static <U extends ITask> InnerTaskMatcher<U> taskWith(Class<U> clazz, Matcher<U> matcher) {
return new InnerTaskMatcher<>(matcher);
}
}
/**
* Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow.
@ -694,7 +1036,8 @@ public class WorkerTest {
* @return Executor service that will be owned by the worker.
*/
private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() {
return new WorkerThreadPoolExecutor();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
return new WorkerThreadPoolExecutor(threadFactory);
}
private List<Shard> createShardListWithOneShard() {

View file

@ -1,18 +1,32 @@
package com.amazonaws.services.kinesis.clientlibrary.types;
import org.junit.Assert;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.junit.Test;
/**
* Unit tests of ShutdownReason enum class.
*/
public class ShutdownReasonTest {
@Test
public void testToString() {
Assert.assertEquals("ZOMBIE", String.valueOf(ShutdownReason.ZOMBIE));
Assert.assertEquals("TERMINATE", String.valueOf(ShutdownReason.TERMINATE));
Assert.assertEquals("ZOMBIE", ShutdownReason.ZOMBIE.toString());
Assert.assertEquals("TERMINATE", ShutdownReason.TERMINATE.toString());
public void testTransitionZombie() {
assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.TERMINATE), equalTo(false));
assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false));
}
@Test
public void testTransitionTerminate() {
assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true));
assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false));
}
@Test
public void testTransitionRequested() {
assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true));
assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.TERMINATE), equalTo(true));
}
}

View file

@ -0,0 +1,63 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
public class KinesisClientLeaseBuilder {
private String leaseKey;
private String leaseOwner;
private Long leaseCounter = 0L;
private UUID concurrencyToken;
private Long lastCounterIncrementNanos;
private ExtendedSequenceNumber checkpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) {
this.leaseKey = leaseKey;
return this;
}
public KinesisClientLeaseBuilder withLeaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
return this;
}
public KinesisClientLeaseBuilder withLeaseCounter(Long leaseCounter) {
this.leaseCounter = leaseCounter;
return this;
}
public KinesisClientLeaseBuilder withConcurrencyToken(UUID concurrencyToken) {
this.concurrencyToken = concurrencyToken;
return this;
}
public KinesisClientLeaseBuilder withLastCounterIncrementNanos(Long lastCounterIncrementNanos) {
this.lastCounterIncrementNanos = lastCounterIncrementNanos;
return this;
}
public KinesisClientLeaseBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) {
this.checkpoint = checkpoint;
return this;
}
public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) {
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
return this;
}
public KinesisClientLeaseBuilder withParentShardIds(Set<String> parentShardIds) {
this.parentShardIds = parentShardIds;
return this;
}
public KinesisClientLease build() {
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds);
}
}

View file

@ -28,7 +28,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.core.JsonProcessingException;

View file

@ -33,7 +33,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;

View file

@ -36,7 +36,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;

View file

@ -14,14 +14,13 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;