Allow for a Graceful Shutdown of the Worker (#109)

Add a new method to the worker requestShutdown that allows the worker to
gracefully shutdown all record processors.  The graceful shutdown gives
the record processors a last chance to checkpoint before they're
terminated.

To use these new features the record processor must implement
IShutdownnotificationaware.
This commit is contained in:
Justin Pfifer 2016-10-26 12:57:50 -07:00 committed by GitHub
parent db314b970b
commit 26e67a33b1
38 changed files with 2479 additions and 271 deletions

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces;
import java.util.List; import java.util.List;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/** /**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon * The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon

View file

@ -0,0 +1,19 @@
package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
/**
* Allows a record processor to indicate it's aware of requested shutdowns, and handle the request.
*/
public interface IShutdownNotificationAware {
/**
* Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint.
*
* The record processor will still have shutdown called.
*
* @param checkpointer the checkpointer that can be used to save progress.
*/
void shutdownRequested(IRecordProcessorCheckpointer checkpointer);
}

View file

@ -0,0 +1,602 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects.
*
* <h2>State Diagram</h2>
*
* <pre>
* +-------------------+
* | Waiting on Parent | +------------------+
* +----+ Shard | | Shutdown |
* | | | +--------------------+ Notification |
* | +----------+--------+ | Shutdown: | Requested |
* | | Success | Requested +-+-------+--------+
* | | | | |
* | +------+-------------+ | | | Shutdown:
* | | Initializing +-----+ | | Requested
* | | | | | |
* | | +-----+-------+ | |
* | +---------+----------+ | | Shutdown: | +-----+-------------+
* | | Success | | Terminated | | Shutdown |
* | | | | Zombie | | Notification +-------------+
* | +------+-------------+ | | | | Complete | |
* | | Processing +--+ | | ++-----------+------+ |
* | +---+ | | | | | |
* | | | +----------+ | | | Shutdown: |
* | | +------+-------------+ | \ / | Requested |
* | | | | \/ +--------------------+
* | | | | ||
* | | Success | | || Shutdown:
* | +----------+ | || Terminated
* | | || Zombie
* | | ||
* | | ||
* | | +---++--------------+
* | | | Shutting Down |
* | +-----------+ |
* | | |
* | +--------+----------+
* | |
* | | Shutdown:
* | | All Reasons
* | |
* | |
* | Shutdown: +--------+----------+
* | All Reasons | Shutdown |
* +-------------------------------------------------------+ Complete |
* | |
* +-------------------+
* </pre>
*/
class ConsumerStates {
/**
* Enumerates processing states when working on a shard.
*/
enum ShardConsumerState {
// @formatter:off
WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()),
INITIALIZING(new InitializingState()),
PROCESSING(new ProcessingState()),
SHUTDOWN_REQUESTED(new ShutdownNotificationState()),
SHUTTING_DOWN(new ShuttingDownState()),
SHUTDOWN_COMPLETE(new ShutdownCompleteState());
//@formatter:on
private final ConsumerState consumerState;
ShardConsumerState(ConsumerState consumerState) {
this.consumerState = consumerState;
}
public ConsumerState getConsumerState() {
return consumerState;
}
}
/**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs.
*
*/
interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method.
*
* @param consumer
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
*/
ITask createTask(ShardConsumer consumer);
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}.
*
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState successTransition();
/**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason.
*
* @param shutdownReason
* the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState shutdownTransition(ShutdownReason shutdownReason);
/**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
*
* @return the type of task that this state represents.
*/
TaskType getTaskType();
/**
* An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}.
*
* @return the type of consumer state this represents.
*/
ShardConsumerState getState();
boolean isTerminal();
}
/**
* The initial state that any {@link ShardConsumer} should start in.
*/
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
private static ConsumerState shutdownStateFor(ShutdownReason reason) {
switch (reason) {
case REQUESTED:
return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
case TERMINATE:
case ZOMBIE:
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
default:
throw new IllegalArgumentException("Unknown reason: " + reason);
}
}
/**
* This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
* shards have been completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transition to the initializing state to allow the record processor to be initialized in preparation of
* processing.</dd>
* <dt>Shutdown</dt>
* <dd>
* <dl>
* <dt>All Reasons</dt>
* <dd>Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be
* informed of the shutdown.</dd>
* </dl>
* </dd>
* </dl>
*/
static class BlockedOnParentState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(),
consumer.getParentShardPollIntervalMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.INITIALIZING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.BLOCK_ON_PARENT_SHARDS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.WAITING_ON_PARENT_SHARDS;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for initializing the record processor with the shard information.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transitions to the processing state which will begin to send records to the record processor</dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has been initialized, but hasn't processed any records. This requires that
* the record processor be notified of the shutdown, even though there is almost no actions the record processor
* could take.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>
* <p>
* This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never
* makes an requests to Kinesis for records, so it can't reach the end of a shard.
* </p>
* <p>
* Transitions to the {@link ShuttingDownState}
* </p>
* </dd>
* </dl>
* </dd>
* </dl>
*/
static class InitializingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.INITIALIZE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.INITIALIZING;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor.
* While in this state the only way a transition will occur is if a shutdown has been triggered.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Doesn't actually transition, but instead returns the same state</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end
* of the shard triggering a {@link ShutdownReason#TERMINATE}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ProcessingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.PROCESS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.PROCESSING;
}
@Override
public boolean isTerminal() {
return false;
}
}
static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState();
/**
* This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a
* chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a
* shutdown on the {@link InitializingState} or {@link ProcessingState}.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. An explicit shutdown will allow the record
* processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification());
}
@Override
public ConsumerState successTransition() {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason == ShutdownReason.REQUESTED) {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the
* processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownNotificationCompletionState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is
* waiting that notification. While waiting for the notification no further processing should occur on the
* {@link ShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationCompletionState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason != ShutdownReason.REQUESTED) {
return shutdownReason.getShutdownState();
}
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Transitions to the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has processed the final shutdown indication, and depending on the shutdown
* reason taken the correct course of action. From this point on there should be no more interactions with the
* record processor or {@link ShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Transitions to {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShuttingDownState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTTING_DOWN;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any
* further actions.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Remains in {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownCompleteState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
if (consumer.getShutdownNotification() != null) {
consumer.getShutdownNotification().shutdownComplete();
}
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_COMPLETE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_COMPLETE;
}
@Override
public boolean isTerminal() {
return true;
}
}
}

View file

@ -14,8 +14,9 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -33,8 +34,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
@ -200,23 +201,29 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
* @return Current shard/lease assignments * @return Current shard/lease assignments
*/ */
public List<ShardInfo> getCurrentAssignments() { public List<ShardInfo> getCurrentAssignments() {
List<ShardInfo> assignments = new LinkedList<ShardInfo>();
Collection<KinesisClientLease> leases = getAssignments(); Collection<KinesisClientLease> leases = getAssignments();
if ((leases != null) && (!leases.isEmpty())) { return convertLeasesToAssignments(leases);
}
public static List<ShardInfo> convertLeasesToAssignments(Collection<KinesisClientLease> leases) {
if (leases == null || leases.isEmpty()) {
return Collections.emptyList();
}
List<ShardInfo> assignments = new ArrayList<>(leases.size());
for (KinesisClientLease lease : leases) { for (KinesisClientLease lease : leases) {
Set<String> parentShardIds = lease.getParentShardIds(); assignments.add(convertLeaseToAssignment(lease));
ShardInfo assignment =
new ShardInfo(
lease.getLeaseKey(),
lease.getConcurrencyToken().toString(),
parentShardIds,
lease.getCheckpoint());
assignments.add(assignment);
}
} }
return assignments; return assignments;
} }
public static ShardInfo convertLeaseToAssignment(KinesisClientLease lease) {
Set<String> parentShardIds = lease.getParentShardIds();
return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), parentShardIds,
lease.getCheckpoint());
}
/** /**
* Initialize the lease coordinator (create the lease table if needed). * Initialize the lease coordinator (create the lease table if needed).
* @throws DependencyException * @throws DependencyException

View file

@ -63,4 +63,12 @@ class MetricsCollectingTaskDecorator implements ITask {
return other.getTaskType(); return other.getTaskType();
} }
@Override
public String toString() {
return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")";
}
ITask getOther() {
return other;
}
} }

View file

@ -14,6 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -24,10 +25,10 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Responsible for consuming data records of a (specified) shard. * Responsible for consuming data records of a (specified) shard.
@ -36,13 +37,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
*/ */
class ShardConsumer { class ShardConsumer {
/**
* Enumerates processing states when working on a shard.
*/
enum ShardConsumerState {
WAITING_ON_PARENT_SHARDS, INITIALIZING, PROCESSING, SHUTTING_DOWN, SHUTDOWN_COMPLETE;
}
private static final Log LOG = LogFactory.getLog(ShardConsumer.class); private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
@ -68,13 +62,13 @@ class ShardConsumer {
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates. * much coordination/synchronization to handle concurrent reads/updates.
*/ */
private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS; private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE;
/* /*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
*/ */
private volatile boolean beginShutdown;
private volatile ShutdownReason shutdownReason; private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;
/** /**
* @param shardInfo Shard information * @param shardInfo Shard information
@ -129,41 +123,19 @@ class ShardConsumer {
return checkAndSubmitNextTask(); return checkAndSubmitNextTask();
} }
// CHECKSTYLE:OFF CyclomaticComplexity private boolean readyForNextTask() {
return future == null || future.isCancelled() || future.isDone();
}
private synchronized boolean checkAndSubmitNextTask() { private synchronized boolean checkAndSubmitNextTask() {
// Task completed successfully (without exceptions)
boolean taskCompletedSuccessfully = false;
boolean submittedNewTask = false; boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) { if (readyForNextTask()) {
if ((future != null) && future.isDone()) { TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE;
try { if (future != null && future.isDone()) {
TaskResult result = future.get(); taskOutcome = determineTaskOutcome();
if (result.getException() == null) {
taskCompletedSuccessfully = true;
if (result.isShardEndReached()) {
markForShutdown(ShutdownReason.TERMINATE);
} }
} else {
if (LOG.isDebugEnabled()) { updateState(taskOutcome);
Exception taskException = result.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
LOG.debug("Shard " + shardInfo.getShardId()
+ " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
result.getException());
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null;
}
}
updateState(taskCompletedSuccessfully);
ITask nextTask = getNextTask(); ITask nextTask = getNextTask();
if (nextTask != null) { if (nextTask != null) {
currentTask = nextTask; currentTask = nextTask;
@ -196,7 +168,56 @@ class ShardConsumer {
return submittedNewTask; return submittedNewTask;
} }
// CHECKSTYLE:ON CyclomaticComplexity public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
private enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
}
private TaskOutcome determineTaskOutcome() {
try {
TaskResult result = future.get();
if (result.getException() == null) {
if (result.isShardEndReached()) {
return TaskOutcome.END_OF_SHARD;
}
return TaskOutcome.SUCCESSFUL;
}
logTaskException(result);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null;
}
return TaskOutcome.FAILURE;
}
private void logTaskException(TaskResult taskResult) {
if (LOG.isDebugEnabled()) {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
taskResult.getException());
}
}
}
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
/** /**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
@ -205,17 +226,15 @@ class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress) * @return true if shutdown is complete (false if shutdown is still in progress)
*/ */
synchronized boolean beginShutdown() { synchronized boolean beginShutdown() {
if (currentState != ShardConsumerState.SHUTDOWN_COMPLETE) {
markForShutdown(ShutdownReason.ZOMBIE); markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask(); checkAndSubmitNextTask();
}
return isShutdown(); return isShutdown();
} }
synchronized void markForShutdown(ShutdownReason reason) { synchronized void markForShutdown(ShutdownReason reason) {
beginShutdown = true;
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
if ((shutdownReason == null) || (shutdownReason == ShutdownReason.TERMINATE)) { if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason; shutdownReason = reason;
} }
} }
@ -227,7 +246,7 @@ class ShardConsumer {
* @return true if shutdown is complete * @return true if shutdown is complete
*/ */
boolean isShutdown() { boolean isShutdown() {
return currentState == ShardConsumerState.SHUTDOWN_COMPLETE; return currentState.isTerminal();
} }
/** /**
@ -243,48 +262,7 @@ class ShardConsumer {
* @return Return next task to run * @return Return next task to run
*/ */
private ITask getNextTask() { private ITask getNextTask() {
ITask nextTask = null; ITask nextTask = currentState.createTask(this);
switch (currentState) {
case WAITING_ON_PARENT_SHARDS:
nextTask = new BlockOnParentShardTask(shardInfo, leaseManager, parentShardPollIntervalMillis);
break;
case INITIALIZING:
nextTask =
new InitializeTask(shardInfo,
recordProcessor,
checkpoint,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis,
streamConfig);
break;
case PROCESSING:
nextTask =
new ProcessTask(shardInfo,
streamConfig,
recordProcessor,
recordProcessorCheckpointer,
dataFetcher,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist);
break;
case SHUTTING_DOWN:
nextTask =
new ShutdownTask(shardInfo,
recordProcessor,
recordProcessorCheckpointer,
shutdownReason,
streamConfig.getStreamProxy(),
streamConfig.getInitialPositionInStream(),
cleanupLeasesOfCompletedShards,
leaseManager,
taskBackoffTimeMillis);
break;
case SHUTDOWN_COMPLETE:
break;
default:
break;
}
if (nextTask == null) { if (nextTask == null) {
return null; return null;
@ -297,71 +275,93 @@ class ShardConsumer {
* Note: This is a private/internal method with package level access solely for testing purposes. * Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info. * Update state based on information about: task success, current state, and shutdown info.
* *
* @param taskCompletedSuccessfully Whether (current) task completed successfully. * @param taskOutcome The outcome of the last task
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity void updateState(TaskOutcome taskOutcome) {
void updateState(boolean taskCompletedSuccessfully) { if (taskOutcome == TaskOutcome.END_OF_SHARD) {
if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) { markForShutdown(ShutdownReason.TERMINATE);
// Shutdown was completed and there nothing we can do after that
return;
} }
if ((currentTask == null) && beginShutdown) { if (isShutdownRequested()) {
// Shard didn't start any tasks and can be shutdown fast currentState = currentState.shutdownTransition(shutdownReason);
currentState = ShardConsumerState.SHUTDOWN_COMPLETE; } else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
return; if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition();
} else {
LOG.error("Current State task type of '" + currentState.getTaskType()
+ "' doesn't match the current tasks type of '" + currentTask.getTaskType()
+ "'. This shouldn't happen, and indicates a programming error. "
+ "Unable to safely transition to the next state.");
} }
if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) {
// Shard received signal to start shutdown.
// Whatever task we were working on should be stopped and shutdown task should be executed
currentState = ShardConsumerState.SHUTTING_DOWN;
return;
}
switch (currentState) {
case WAITING_ON_PARENT_SHARDS:
if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.INITIALIZING;
}
break;
case INITIALIZING:
if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.PROCESSING;
}
break;
case PROCESSING:
if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) {
currentState = ShardConsumerState.PROCESSING;
}
break;
case SHUTTING_DOWN:
if (currentTask == null
|| (taskCompletedSuccessfully && TaskType.SHUTDOWN.equals(currentTask.getTaskType()))) {
currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
}
break;
default:
LOG.error("Unexpected state: " + currentState);
break;
} }
//
// Don't change state otherwise
//
} }
// CHECKSTYLE:ON CyclomaticComplexity @VisibleForTesting
boolean isShutdownRequested() {
return shutdownReason != null;
}
/** /**
* Private/Internal method - has package level access solely for testing purposes. * Private/Internal method - has package level access solely for testing purposes.
* *
* @return the currentState * @return the currentState
*/ */
ShardConsumerState getCurrentState() { ConsumerStates.ShardConsumerState getCurrentState() {
return currentState; return currentState.getState();
} }
/** StreamConfig getStreamConfig() {
* Private/Internal method - has package level access solely for testing purposes. return streamConfig;
*
* @return the beginShutdown
*/
boolean isBeginShutdown() {
return beginShutdown;
} }
IRecordProcessor getRecordProcessor() {
return recordProcessor;
}
RecordProcessorCheckpointer getRecordProcessorCheckpointer() {
return recordProcessorCheckpointer;
}
ExecutorService getExecutorService() {
return executorService;
}
ShardInfo getShardInfo() {
return shardInfo;
}
KinesisDataFetcher getDataFetcher() {
return dataFetcher;
}
ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
}
ICheckpoint getCheckpoint() {
return checkpoint;
}
long getParentShardPollIntervalMillis() {
return parentShardPollIntervalMillis;
}
boolean isCleanupLeasesOfCompletedShards() {
return cleanupLeasesOfCompletedShards;
}
long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
Future<TaskResult> getFuture() {
return future;
}
ShutdownNotification getShutdownNotification() {
return shutdownNotification;
}
} }

View file

@ -0,0 +1,71 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
/**
* Contains callbacks for completion of stages in a requested record processor shutdown.
*
*/
class ShardConsumerShutdownNotification implements ShutdownNotification {
private final LeaseCoordinator<KinesisClientLease> leaseCoordinator;
private final KinesisClientLease lease;
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private boolean notificationComplete = false;
private boolean allNotificationCompleted = false;
/**
* Creates a new shutdown request object.
*
* @param leaseCoordinator
* the lease coordinator used to drop leases from once the initial shutdown request is completed.
* @param lease
* the lease that this shutdown request will free once initial shutdown is complete
* @param notificationCompleteLatch
* used to inform the caller once the
* {@link IShutdownNotificationAware} object has been
* notified of the shutdown request.
* @param shutdownCompleteLatch
* used to inform the caller once the record processor is fully shutdown
*/
ShardConsumerShutdownNotification(LeaseCoordinator<KinesisClientLease> leaseCoordinator, KinesisClientLease lease,
CountDownLatch notificationCompleteLatch, CountDownLatch shutdownCompleteLatch) {
this.leaseCoordinator = leaseCoordinator;
this.lease = lease;
this.notificationCompleteLatch = notificationCompleteLatch;
this.shutdownCompleteLatch = shutdownCompleteLatch;
}
@Override
public void shutdownNotificationComplete() {
if (notificationComplete) {
return;
}
//
// Once the notification has been completed, the lease needs to dropped to allow the worker to complete
// shutdown of the record processor.
//
leaseCoordinator.dropLease(lease);
notificationCompleteLatch.countDown();
notificationComplete = true;
}
@Override
public void shutdownComplete() {
if (allNotificationCompleted) {
return;
}
if (!notificationComplete) {
notificationCompleteLatch.countDown();
}
shutdownCompleteLatch.countDown();
allNotificationCompleted = true;
}
}

View file

@ -0,0 +1,155 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
*/
class ShutdownFuture implements Future<Void> {
private static final Log log = LogFactory.getLog(ShutdownFuture.class);
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel a shutdown process");
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isWorkerShutdownComplete();
}
private boolean isWorkerShutdownComplete() {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
}
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
final long startNanos = System.nanoTime();
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
+ awaitingFinalShutdown + " awaiting final shutdown");
if (awaitingFinalShutdown != 0) {
//
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
// notification
//
return checkWorkerShutdownMiss(awaitingFinalShutdown);
}
}
long remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
worker.shutdown();
remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
return checkWorkerShutdownMiss(outstanding);
}
return 0;
}
private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
long checkNanos = System.nanoTime() - startNanos;
return unit.toNanos(timeout) - checkNanos;
}
private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
if (remainingNanos <= 0) {
throw new TimeoutException(message);
}
}
/**
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
* latch should be decremented before the shutdown completion.
*
* @param outstanding
* the number of record processor still awaiting shutdown.
* @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
*/
private long checkWorkerShutdownMiss(long outstanding) {
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
boolean complete = false;
do {
try {
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
complete = outstanding == 0;
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} catch (TimeoutException te) {
log.info("Timeout while waiting for completion: " + te.getMessage());
}
} while(!complete);
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long outstanding = outstandingRecordProcessors(timeout, unit);
if (outstanding != 0) {
throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
}
return null;
}
}

View file

@ -0,0 +1,22 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
/**
* A shutdown request to the ShardConsumer
*/
public interface ShutdownNotification {
/**
* Used to indicate that the record processor has been notified of a requested shutdown, and given the chance to
* checkpoint.
*
*/
void shutdownNotificationComplete();
/**
* Used to indicate that the record processor has completed the call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)} has
* completed.
*/
void shutdownComplete();
}

View file

@ -0,0 +1,43 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
/**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
*/
class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;
}
@Override
public TaskResult call() {
try {
if (recordProcessor instanceof IShutdownNotificationAware) {
IShutdownNotificationAware shutdownNotificationAware = (IShutdownNotificationAware) recordProcessor;
try {
shutdownNotificationAware.shutdownRequested(recordProcessorCheckpointer);
} catch (Exception ex) {
return new TaskResult(ex);
}
}
return new TaskResult(null);
} finally {
shutdownNotification.shutdownNotificationComplete();
}
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
}

View file

@ -12,7 +12,12 @@
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.types; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
/** /**
* Reason the RecordProcessor is being shutdown. * Reason the RecordProcessor is being shutdown.
@ -28,7 +33,7 @@ public enum ShutdownReason {
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
* processing data). * processing data).
*/ */
ZOMBIE, ZOMBIE(3, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
/** /**
* Terminate processing for this RecordProcessor (resharding use case). * Terminate processing for this RecordProcessor (resharding use case).
@ -36,5 +41,38 @@ public enum ShutdownReason {
* Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records
* from this shard and processing of child shards can be started. * from this shard and processing of child shards can be started.
*/ */
TERMINATE TERMINATE(2, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
/**
* Indicates that the entire application is being shutdown, and if desired the record processor will be given a
* final chance to checkpoint. This state will not trigger a direct call to
* {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but
* instead depend on a different interface for backward compatibility.
*/
REQUESTED(1, ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState());
private final int rank;
private final ConsumerState shutdownState;
ShutdownReason(int rank, ConsumerState shutdownState) {
this.rank = rank;
this.shutdownState = shutdownState;
}
/**
* Indicates whether the given reason can override the current reason.
*
* @param reason the reason to transition to
* @return true if the transition is allowed, false if it's not.
*/
public boolean canTransitionTo(ShutdownReason reason) {
if (reason == null) {
return false;
}
return reason.rank > this.rank;
}
ConsumerState getShutdownState() {
return shutdownState;
}
} }

View file

@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Task for invoking the RecordProcessor shutdown() callback. * Task for invoking the RecordProcessor shutdown() callback.
@ -155,4 +155,9 @@ class ShutdownTask implements ITask {
return taskType; return taskType;
} }
@VisibleForTesting
ShutdownReason getReason() {
return reason;
}
} }

View file

@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/** /**
* Enumerates types of tasks executed as part of processing a shard. * Enumerates types of tasks executed as part of processing a shard.
*/ */
enum TaskType { public enum TaskType {
/** /**
* Polls and waits until parent shard(s) have been fully processed. * Polls and waits until parent shard(s) have been fully processed.
*/ */
@ -34,8 +34,16 @@ enum TaskType {
* Shutdown of RecordProcessor. * Shutdown of RecordProcessor.
*/ */
SHUTDOWN, SHUTDOWN,
/**
* Graceful shutdown has been requested, and notification of the record processor will occur.
*/
SHUTDOWN_NOTIFICATION,
/**
* Occurs once the shutdown has been completed
*/
SHUTDOWN_COMPLETE,
/** /**
* Sync leases/activities corresponding to Kinesis shards. * Sync leases/activities corresponding to Kinesis shards.
*/ */
SHARDSYNC; SHARDSYNC
} }

View file

@ -14,13 +14,17 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -38,15 +42,18 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Worker is the high level class that Kinesis applications use to start * Worker is the high level class that Kinesis applications use to start
@ -85,6 +92,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown; private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis; private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false;
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
@ -499,11 +507,89 @@ public class Worker implements Runnable {
} }
/** /**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware},
* if executor services were passed to the worker by the user, worker will not attempt to shutdown * of the impending shutdown. This gives the record processor a final chance to checkpoint.
* those resources. *
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
* lost after requesting shutdown, but before the notification is dispatched.</b>
*
* <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
* allow the record processors a chance to checkpoint a final time.
* <ol>
* <li>Call to request shutdown invoked.</li>
* <li>Worker stops attempting to acquire new leases</li>
* <li>Record Processor Shutdown Begins
* <ol>
* <li>Record processor is notified of the impending shutdown, and given a final chance to checkpoint</li>
* <li>The lease for the record processor is then dropped.</li>
* <li>The record processor enters into an idle state waiting for the worker to complete final termination</li>
* <li>The worker will detect a record processor that has lost it's lease, and will terminate the record processor
* with {@link ShutdownReason#ZOMBIE}</li>
* </ol>
* </li>
* <li>The worker will shutdown all record processors.</li>
* <li>Once all record processors have been terminated, the worker will terminate all owned resources.</li>
* <li>Once the worker shutdown is complete, the returned future is completed.</li>
* </ol>
*
*
*
* @return a Future that will be set once the shutdown is complete.
*/
public Future<Void> requestShutdown() {
leaseCoordinator.stopLeaseTaker();
//
// Stop accepting new leases
//
Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases shutdown is already completed.
//
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
for (KinesisClientLease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
}
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
}
boolean isShutdownComplete() {
return shutdownComplete;
}
ConcurrentMap<ShardInfo, ShardConsumer> getShardInfoShardConsumerMap() {
return shardInfoShardConsumerMap;
}
/**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
*
* <h2>Shutdown Process</h2> When called this will start shutdown of the record processor, and eventually shutdown
* the worker itself.
* <ol>
* <li>Call to start shutdown invoked</li>
* <li>Lease coordinator told to stop taking leases, and to drop existing leases.</li>
* <li>Worker discovers record processors that no longer have leases.</li>
* <li>Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.</li>
* <li>Once all record processors are shutdown, worker terminates owned resources.</li>
* <li>Shutdown complete.</li>
* </ol>
*/ */
public void shutdown() { public void shutdown() {
if (shutdown) {
LOG.warn("Shutdown requested a second time.");
return;
}
LOG.info("Worker shutdown requested."); LOG.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process. // Set shutdown flag, so Worker.run can start shutdown process.
@ -530,6 +616,7 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) { if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown(); ((CWMetricsFactory) metricsFactory).shutdown();
} }
shutdownComplete = true;
} }
/** /**
@ -757,7 +844,8 @@ public class Worker implements Runnable {
* @return Default executor service that should be used by the worker. * @return Default executor service that should be used by the worker.
*/ */
private static ExecutorService getExecutorService() { private static ExecutorService getExecutorService() {
return new WorkerThreadPoolExecutor(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
return new WorkerThreadPoolExecutor(threadFactory);
} }
/** /**
@ -786,10 +874,10 @@ public class Worker implements Runnable {
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor { static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
private static final long DEFAULT_KEEP_ALIVE_TIME = 60L; private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;
WorkerThreadPoolExecutor() { WorkerThreadPoolExecutor(ThreadFactory threadFactory) {
// Defaults are based on Executors.newCachedThreadPool() // Defaults are based on Executors.newCachedThreadPool()
super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new SynchronousQueue<Runnable>()); threadFactory);
} }
} }

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.types; package com.amazonaws.services.kinesis.clientlibrary.types;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/** /**
* Container for the parameters to the IRecordProcessor's * Container for the parameters to the IRecordProcessor's

View file

@ -15,9 +15,9 @@
package com.amazonaws.services.kinesis.leases.impl; package com.amazonaws.services.kinesis.leases.impl;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@ -41,6 +41,16 @@ public class KinesisClientLease extends Lease {
this.parentShardIds.addAll(other.getParentShardIds()); this.parentShardIds.addAll(other.getParentShardIds());
} }
KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint,
Set<String> parentShardIds) {
super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos);
this.checkpoint = checkpoint;
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
this.parentShardIds.addAll(parentShardIds);
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -63,11 +63,17 @@ public class Lease {
* @param lease lease to copy * @param lease lease to copy
*/ */
protected Lease(Lease lease) { protected Lease(Lease lease) {
this.leaseKey = lease.getLeaseKey(); this(lease.getLeaseKey(), lease.getLeaseOwner(), lease.getLeaseCounter(), lease.getConcurrencyToken(),
this.leaseOwner = lease.getLeaseOwner(); lease.getLastCounterIncrementNanos());
this.leaseCounter = lease.getLeaseCounter(); }
this.concurrencyToken = lease.getConcurrencyToken();
this.lastCounterIncrementNanos = lease.getLastCounterIncrementNanos(); protected Lease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
Long lastCounterIncrementNanos) {
this.leaseKey = leaseKey;
this.leaseOwner = leaseOwner;
this.leaseCounter = leaseCounter;
this.concurrencyToken = concurrencyToken;
this.lastCounterIncrementNanos = lastCounterIncrementNanos;
} }
/** /**

View file

@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -83,6 +84,7 @@ public class LeaseCoordinator<T extends Lease> {
private ScheduledExecutorService leaseCoordinatorThreadPool; private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool; private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false; private volatile boolean running = false;
private ScheduledFuture<?> takerFuture;
/** /**
* Constructor. * Constructor.
@ -199,9 +201,15 @@ public class LeaseCoordinator<T extends Lease> {
leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY); leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
// Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation. // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS); takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(),
0L,
takerIntervalMillis,
TimeUnit.MILLISECONDS);
// Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation. // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS); leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(),
0L,
renewerIntervalMillis,
TimeUnit.MILLISECONDS);
running = true; running = true;
} }
@ -309,6 +317,27 @@ public class LeaseCoordinator<T extends Lease> {
} }
} }
/**
* Requests the cancellation of the lease taker.
*/
public void stopLeaseTaker() {
takerFuture.cancel(false);
}
/**
* Requests that renewals for the given lease are stopped.
*
* @param lease the lease to stop renewing.
*/
public void dropLease(T lease) {
synchronized (shutdownLock) {
if (lease != null) {
leaseRenewer.dropLease(lease);
}
}
}
/** /**
* @return true if this LeaseCoordinator is running * @return true if this LeaseCoordinator is running
*/ */

View file

@ -369,6 +369,15 @@ public class LeaseRenewer<T extends Lease> implements ILeaseRenewer<T> {
ownedLeases.clear(); ownedLeases.clear();
} }
/**
* {@inheritDoc}
* @param lease the lease to drop.
*/
@Override
public void dropLease(T lease) {
ownedLeases.remove(lease.getLeaseKey());
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View file

@ -73,6 +73,13 @@ public interface ILeaseRenewer<T extends Lease> {
*/ */
public void clearCurrentlyHeldLeases(); public void clearCurrentlyHeldLeases();
/**
* Stops the lease renewer from continunig to maintain the given lease.
*
* @param lease the lease to drop.
*/
void dropLease(T lease);
/** /**
* Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as * Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as
* leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match * leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match

View file

@ -26,7 +26,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;

View file

@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;

View file

@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;

View file

@ -14,7 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/** /**
* A message to indicate to the client's process that it should shutdown and then terminate. * A message to indicate to the client's process that it should shutdown and then terminate.

View file

@ -0,0 +1,385 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.hamcrest.Condition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerStatesTest {
@Mock
private ShardConsumer consumer;
@Mock
private StreamConfig streamConfig;
@Mock
private IRecordProcessor recordProcessor;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock
private ExecutorService executorService;
@Mock
private ShardInfo shardInfo;
@Mock
private KinesisDataFetcher dataFetcher;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private ICheckpoint checkpoint;
@Mock
private Future<TaskResult> future;
@Mock
private ShutdownNotification shutdownNotification;
@Mock
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
private long taskBackoffTimeMillis = 0xBEEF;
private ShutdownReason reason = ShutdownReason.TERMINATE;
@Before
public void setup() {
when(consumer.getStreamConfig()).thenReturn(streamConfig);
when(consumer.getRecordProcessor()).thenReturn(recordProcessor);
when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer);
when(consumer.getExecutorService()).thenReturn(executorService);
when(consumer.getShardInfo()).thenReturn(shardInfo);
when(consumer.getDataFetcher()).thenReturn(dataFetcher);
when(consumer.getLeaseManager()).thenReturn(leaseManager);
when(consumer.getCheckpoint()).thenReturn(checkpoint);
when(consumer.getFuture()).thenReturn(future);
when(consumer.getShutdownNotification()).thenReturn(shutdownNotification);
when(consumer.getParentShardPollIntervalMillis()).thenReturn(parentShardPollIntervalMillis);
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason);
}
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@Test
public void blockOnParentStateTest() {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task,
taskWith(BlockOnParentShardTask.class, LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
assertThat(task, taskWith(BlockOnParentShardTask.class, Long.class, "parentShardPollIntervalMillis",
equalTo(parentShardPollIntervalMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState()));
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(shutdownReason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
}
assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));
assertThat(state.getTaskType(), equalTo(TaskType.BLOCK_ON_PARENT_SHARDS));
}
@Test
public void initializingStateTest() {
ConsumerState state = ShardConsumerState.INITIALIZING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, initTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, initTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, initTask(ICheckpoint.class, "checkpoint", equalTo(checkpoint)));
assertThat(task, initTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, initTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, initTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.INITIALIZING));
assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE));
}
@Test
public void processingStateTest() {
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, shutdownReqTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, shutdownReqTask(IRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer)));
assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification)));
assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
}
@Test
public void shutdownRequestCompleteStateTest() {
ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
assertThat(state.createTask(consumer), nullValue());
assertThat(state.successTransition(), equalTo(state));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), equalTo(state));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
}
@Test
public void shuttingDownStateTest() {
ConsumerState state = ShardConsumerState.SHUTTING_DOWN.getConsumerState();
when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
when(streamConfig.getInitialPositionInStream()).thenReturn(initialPositionInStream);
ITask task = state.createTask(consumer);
assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, shutdownTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, shutdownTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy)));
assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
equalTo(initialPositionInStream)));
assertThat(task,
shutdownTask(Boolean.class, "cleanupLeasesOfCompletedShards", equalTo(cleanupLeasesOfCompletedShards)));
assertThat(task, shutdownTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
for (ShutdownReason reason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(reason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
}
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTTING_DOWN));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN));
}
@Test
public void shutdownCompleteStateTest() {
ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
assertThat(state.createTask(consumer), nullValue());
verify(consumer, times(2)).getShutdownNotification();
verify(shutdownNotification).shutdownComplete();
assertThat(state.successTransition(), equalTo(state));
for(ShutdownReason reason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(reason), equalTo(state));
}
assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_COMPLETE));
}
@Test
public void shutdownCompleteStateNullNotificationTest() {
ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
when(consumer.getShutdownNotification()).thenReturn(null);
assertThat(state.createTask(consumer), nullValue());
verify(consumer).getShutdownNotification();
verify(shutdownNotification, never()).shutdownComplete();
}
static <ValueType> ReflectionPropertyMatcher<ShutdownTask, ValueType> shutdownTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<ShutdownNotificationTask, ValueType> shutdownReqTask(
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownNotificationTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<ProcessTask, ValueType> procTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<InitializeTask, ValueType> initTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher);
}
static <TaskType, ValueType> ReflectionPropertyMatcher<TaskType, ValueType> taskWith(Class<TaskType> taskTypeClass,
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName);
}
private static class ReflectionPropertyMatcher<TaskType, ValueType> extends TypeSafeDiagnosingMatcher<ITask> {
private final Class<TaskType> taskTypeClass;
private final Class<ValueType> valueTypeClazz;
private final Matcher<ValueType> matcher;
private final String propertyName;
private final Field matchingField;
private ReflectionPropertyMatcher(Class<TaskType> taskTypeClass, Class<ValueType> valueTypeClass,
Matcher<ValueType> matcher, String propertyName) {
this.taskTypeClass = taskTypeClass;
this.valueTypeClazz = valueTypeClass;
this.matcher = matcher;
this.propertyName = propertyName;
Field[] fields = taskTypeClass.getDeclaredFields();
Field matching = null;
for (Field field : fields) {
if (propertyName.equals(field.getName())) {
matching = field;
}
}
this.matchingField = matching;
}
@Override
protected boolean matchesSafely(ITask item, Description mismatchDescription) {
return Condition.matched(item, mismatchDescription).and(new Condition.Step<ITask, TaskType>() {
@Override
public Condition<TaskType> apply(ITask value, Description mismatch) {
if (taskTypeClass.equals(value.getClass())) {
return Condition.matched(taskTypeClass.cast(value), mismatch);
}
mismatch.appendText("Expected task type of ").appendText(taskTypeClass.getName())
.appendText(" but was ").appendText(value.getClass().getName());
return Condition.notMatched();
}
}).and(new Condition.Step<TaskType, Object>() {
@Override
public Condition<Object> apply(TaskType value, Description mismatch) {
if (matchingField == null) {
mismatch.appendText("Field ").appendText(propertyName).appendText(" not present in ")
.appendText(taskTypeClass.getName());
return Condition.notMatched();
}
try {
return Condition.matched(getValue(value), mismatch);
} catch (RuntimeException re) {
mismatch.appendText("Failure while retrieving value for ").appendText(propertyName);
return Condition.notMatched();
}
}
}).and(new Condition.Step<Object, ValueType>() {
@Override
public Condition<ValueType> apply(Object value, Description mismatch) {
if (value != null && !valueTypeClazz.isAssignableFrom(value.getClass())) {
mismatch.appendText("Expected a value of type ").appendText(valueTypeClazz.getName())
.appendText(" but was ").appendText(value.getClass().getName());
return Condition.notMatched();
}
return Condition.matched(valueTypeClazz.cast(value), mismatch);
}
}).matching(matcher);
}
@Override
public void describeTo(Description description) {
description
.appendText(
"A " + taskTypeClass.getName() + " task with the property " + propertyName + " matching ")
.appendDescriptionOf(matcher);
}
private Object getValue(TaskType task) {
matchingField.setAccessible(true);
try {
return matchingField.get(task);
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to retrieve the value for " + matchingField.getName());
}
}
}
}

View file

@ -23,9 +23,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -46,17 +46,18 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.InMemoryCheckpointImpl;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.ShardConsumerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -68,6 +69,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
/** /**
* Unit tests of {@link ShardConsumer}. * Unit tests of {@link ShardConsumer}.
*/ */
@RunWith(MockitoJUnitRunner.class)
public class ShardConsumerTest { public class ShardConsumerTest {
private static final Log LOG = LogFactory.getLog(ShardConsumerTest.class); private static final Log LOG = LogFactory.getLog(ShardConsumerTest.class);
@ -86,6 +88,17 @@ public class ShardConsumerTest {
// ... a non-final public class, and so can be mocked and spied. // ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1); private final ExecutorService executorService = Executors.newFixedThreadPool(1);
@Mock
private IRecordProcessor processor;
@Mock
private IKinesisProxy streamProxy;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
/** /**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/ */
@ -93,12 +106,9 @@ public class ShardConsumerTest {
@Test @Test
public final void testInitializationStateUponFailure() throws Exception { public final void testInitializationStateUponFailure() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig = StreamConfig streamConfig =
new StreamConfig(streamProxy, new StreamConfig(streamProxy,
@ -120,19 +130,19 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
} }
@ -143,13 +153,9 @@ public class ShardConsumerTest {
@Test @Test
public final void testInitializationStateUponSubmissionFailure() throws Exception { public final void testInitializationStateUponSubmissionFailure() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
ExecutorService spyExecutorService = spy(executorService); ExecutorService spyExecutorService = spy(executorService);
when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class); when(checkpoint.getCheckpoint(anyString())).thenThrow(NullPointerException.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
StreamConfig streamConfig = StreamConfig streamConfig =
new StreamConfig(streamProxy, new StreamConfig(streamProxy,
@ -171,31 +177,27 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public final void testRecordProcessorThrowable() throws Exception { public final void testRecordProcessorThrowable() throws Exception {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
ICheckpoint checkpoint = mock(ICheckpoint.class);
IRecordProcessor processor = mock(IRecordProcessor.class);
IKinesisProxy streamProxy = mock(IKinesisProxy.class);
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
StreamConfig streamConfig = StreamConfig streamConfig =
new StreamConfig(streamProxy, new StreamConfig(streamProxy,
1, 1,
@ -219,10 +221,10 @@ public class ShardConsumerTest {
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123"));
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask consumer.consumeShard(); // submit BlockOnParentShardTask
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(processor, times(0)).initialize(any(InitializationInput.class)); verify(processor, times(0)).initialize(any(InitializationInput.class));
// Throw Error when IRecordProcessor.initialize() is invoked. // Throw Error when IRecordProcessor.initialize() is invoked.
@ -230,7 +232,7 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); verify(processor, times(1)).initialize(any(InitializationInput.class));
try { try {
@ -241,24 +243,24 @@ public class ShardConsumerTest {
assertThat(e.getCause(), instanceOf(ExecutionException.class)); assertThat(e.getCause(), instanceOf(ExecutionException.class));
} }
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); verify(processor, times(1)).initialize(any(InitializationInput.class));
doNothing().when(processor).initialize(any(InitializationInput.class)); doNothing().when(processor).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask again. consumer.consumeShard(); // submit InitializeTask again.
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(2)).initialize(any(InitializationInput.class)); verify(processor, times(2)).initialize(any(InitializationInput.class));
// Checking the status of submitted InitializeTask from above should pass. // Checking the status of submitted InitializeTask from above should pass.
consumer.consumeShard(); consumer.consumeShard();
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
} }
/** /**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} * Test method for {@link ShardConsumer#consumeShard()}
*/ */
@Test @Test
public final void testConsumeShard() throws Exception { public final void testConsumeShard() throws Exception {
@ -279,8 +281,6 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
@ -306,20 +306,20 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
Thread.sleep(50L); Thread.sleep(50L);
consumer.consumeShard(); // start initialization consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
// We expect to process all records in numRecs calls // We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) { for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard(); boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) { if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i); LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords; i += maxRecords;
} }
@ -327,11 +327,26 @@ public class ShardConsumerTest {
} }
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.notifyShutdownRequested(shutdownNotification);
consumer.consumeShard();
assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true));
Thread.sleep(50);
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
verify(shutdownNotification).shutdownNotificationComplete();
assertThat(processor.isShutdownNotificationCalled(), equalTo(true));
consumer.consumeShard();
Thread.sleep(50);
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
consumer.beginShutdown(); consumer.beginShutdown();
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown(); consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); consumer.consumeShard();
verify(shutdownNotification, atLeastOnce()).shutdownComplete();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown(); executorService.shutdown();
@ -344,8 +359,7 @@ public class ShardConsumerTest {
} }
/** /**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer#consumeShard()} * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
* that starts from initial position of type AT_TIMESTAMP.
*/ */
@Test @Test
public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception {
@ -369,8 +383,6 @@ public class ShardConsumerTest {
final int idleTimeMS = 0; // keep unit tests fast final int idleTimeMS = 0; // keep unit tests fast
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
@SuppressWarnings("unchecked")
ILeaseManager<KinesisClientLease> leaseManager = mock(ILeaseManager.class);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
@ -397,11 +409,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
Thread.sleep(50L); Thread.sleep(50L);
consumer.consumeShard(); // start initialization consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
@ -410,7 +422,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard(); boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) { if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i); LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.PROCESSING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords; i += maxRecords;
} }
@ -420,9 +432,9 @@ public class ShardConsumerTest {
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown(); consumer.beginShutdown();
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTTING_DOWN))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown(); consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown(); executorService.shutdown();

View file

@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
/** /**
* Helper class to verify shard lineage in unit tests that use TestStreamlet. * Helper class to verify shard lineage in unit tests that use TestStreamlet.

View file

@ -0,0 +1,236 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ShutdownFutureTest {
@Mock
private CountDownLatch shutdownCompleteLatch;
@Mock
private CountDownLatch notificationCompleteLatch;
@Mock
private Worker worker;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Test
public void testSimpleGetAlreadyCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(true);
future.get();
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(worker).shutdown();
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
}
@Test
public void testNotificationNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(false, true);
mockShutdownComplete(true);
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
when(shardInfoConsumerMap.isEmpty()).thenReturn(false);
when(worker.isShutdownComplete()).thenReturn(false);
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
expectedTimeoutException(future);
verify(worker, never()).shutdown();
awaitFuture(future);
verify(notificationCompleteLatch).getCount();
verifyLatchAwait(notificationCompleteLatch, 2);
verify(shutdownCompleteLatch).getCount();
verifyLatchAwait(shutdownCompleteLatch);
verify(worker).shutdown();
}
@Test
public void testShutdownNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(false, true);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
expectedTimeoutException(future);
verify(worker).shutdown();
awaitFuture(future);
verifyLatchAwait(notificationCompleteLatch, 2);
verifyLatchAwait(shutdownCompleteLatch, 2);
verify(worker).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
}
@Test
public void testShutdownNotCompleteButWorkerShutdown() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
when(worker.isShutdownComplete()).thenReturn(true);
mockShardInfoConsumerMap(1);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).size();
}
@Test
public void testShutdownNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(true);
mockShutdownComplete(false);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verifyLatchAwait(shutdownCompleteLatch);
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test
public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockShutdownComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker, never()).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verify(shutdownCompleteLatch, never()).await();
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test(expected = TimeoutException.class)
public void testTimeExceededException() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
future.get(1, TimeUnit.NANOSECONDS);
}
private ShutdownFuture create() {
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
}
private void mockShardInfoConsumerMap(Integer initialItemCount, Integer ... additionalItemCounts) {
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
Boolean additionalEmptyStates[] = new Boolean[additionalItemCounts.length];
for(int i = 0; i < additionalItemCounts.length; ++i) {
additionalEmptyStates[i] = additionalItemCounts[i] == 0;
}
when(shardInfoConsumerMap.size()).thenReturn(initialItemCount, additionalItemCounts);
when(shardInfoConsumerMap.isEmpty()).thenReturn(initialItemCount == 0, additionalEmptyStates);
}
private void verifyLatchAwait(CountDownLatch latch) throws Exception {
verifyLatchAwait(latch, 1);
}
private void verifyLatchAwait(CountDownLatch latch, int times) throws Exception {
verify(latch, times(times)).await(anyLong(), any(TimeUnit.class));
}
private void expectedTimeoutException(ShutdownFuture future) throws Exception {
boolean gotTimeout = false;
try {
awaitFuture(future);
} catch (TimeoutException te) {
gotTimeout = true;
}
assertThat("Expected a timeout exception to occur", gotTimeout);
}
private void awaitFuture(ShutdownFuture future) throws Exception {
future.get(1, TimeUnit.SECONDS);
}
private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(notificationCompleteLatch, initial, states);
}
private void mockShutdownComplete(Boolean initial, Boolean... states) throws Exception {
mockLatch(shutdownCompleteLatch, initial, states);
}
private void mockLatch(CountDownLatch latch, Boolean initial, Boolean... states) throws Exception {
when(latch.await(anyLong(), any(TimeUnit.class))).thenReturn(initial, states);
}
private void mockOutstanding(CountDownLatch latch, Long remaining, Long ... additionalRemaining) throws Exception {
when(latch.getCount()).thenReturn(remaining, additionalRemaining);
}
}

View file

@ -31,7 +31,6 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisC
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -82,7 +81,7 @@ public class ShutdownTaskTest {
} }
/** /**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
*/ */
@Test @Test
public final void testCallWhenApplicationDoesNotCheckpoint() { public final void testCallWhenApplicationDoesNotCheckpoint() {
@ -106,7 +105,7 @@ public class ShutdownTaskTest {
} }
/** /**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#call()}. * Test method for {@link ShutdownTask#call()}.
*/ */
@Test @Test
public final void testCallWhenSyncingShardsThrows() { public final void testCallWhenSyncingShardsThrows() {
@ -131,7 +130,7 @@ public class ShutdownTaskTest {
} }
/** /**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask#getTaskType()}. * Test method for {@link ShutdownTask#getTaskType()}.
*/ */
@Test @Test
public final void testGetTaskType() { public final void testGetTaskType() {

View file

@ -18,8 +18,10 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,12 +36,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
/** /**
* Streamlet that tracks records it's seen - useful for testing. * Streamlet that tracks records it's seen - useful for testing.
*/ */
class TestStreamlet implements IRecordProcessor { class TestStreamlet implements IRecordProcessor, IShutdownNotificationAware {
private static final Log LOG = LogFactory.getLog(TestStreamlet.class); private static final Log LOG = LogFactory.getLog(TestStreamlet.class);
@ -55,6 +56,11 @@ class TestStreamlet implements IRecordProcessor {
private ShutdownReason shutdownReason; private ShutdownReason shutdownReason;
private ShardSequenceVerifier shardSequenceVerifier; private ShardSequenceVerifier shardSequenceVerifier;
private long numProcessRecordsCallsWithEmptyRecordList; private long numProcessRecordsCallsWithEmptyRecordList;
private boolean shutdownNotificationCalled;
private final CountDownLatch initializeLatch = new CountDownLatch(1);
private final CountDownLatch notifyShutdownLatch = new CountDownLatch(1);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
public TestStreamlet() { public TestStreamlet() {
@ -76,6 +82,7 @@ class TestStreamlet implements IRecordProcessor {
if (shardSequenceVerifier != null) { if (shardSequenceVerifier != null) {
shardSequenceVerifier.registerInitialization(shardId); shardSequenceVerifier.registerInitialization(shardId);
} }
initializeLatch.countDown();
} }
@Override @Override
@ -125,6 +132,8 @@ class TestStreamlet implements IRecordProcessor {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
shutdownLatch.countDown();
} }
/** /**
@ -148,4 +157,25 @@ class TestStreamlet implements IRecordProcessor {
return numProcessRecordsCallsWithEmptyRecordList; return numProcessRecordsCallsWithEmptyRecordList;
} }
boolean isShutdownNotificationCalled() {
return shutdownNotificationCalled;
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}
public CountDownLatch getInitializeLatch() {
return initializeLatch;
}
public CountDownLatch getNotifyShutdownLatch() {
return notifyShutdownLatch;
}
public CountDownLatch getShutdownLatch() {
return shutdownLatch;
}
} }

View file

@ -14,21 +14,16 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.lang.Thread.State; import java.lang.Thread.State;
@ -42,21 +37,29 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Condition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
@ -78,8 +81,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseBuilder;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
@ -90,6 +93,8 @@ import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Unit tests of Worker. * Unit tests of Worker.
@ -99,8 +104,8 @@ public class WorkerTest {
private static final Log LOG = LogFactory.getLog(WorkerTest.class); private static final Log LOG = LogFactory.getLog(WorkerTest.class);
@Rule // @Rule
public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30)); // public Timeout timeout = new Timeout((int)TimeUnit.SECONDS.toMillis(30));
private final NullMetricsFactory nullMetricsFactory = new NullMetricsFactory(); private final NullMetricsFactory nullMetricsFactory = new NullMetricsFactory();
private final long taskBackoffTimeMillis = 1L; private final long taskBackoffTimeMillis = 1L;
@ -140,6 +145,10 @@ public class WorkerTest {
private IRecordProcessor v2RecordProcessor; private IRecordProcessor v2RecordProcessor;
@Mock @Mock
private ShardConsumer shardConsumer; private ShardConsumer shardConsumer;
@Mock
private Future<TaskResult> taskFuture;
@Mock
private TaskResult taskResult;
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
@ -347,10 +356,10 @@ public class WorkerTest {
worker.cleanupShardConsumers(assignedShards); worker.cleanupShardConsumers(assignedShards);
// verify shard consumer not present in assignedShards is shut down // verify shard consumer not present in assignedShards is shut down
Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isBeginShutdown()); Assert.assertTrue(consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken.isShutdownRequested());
// verify shard consumers present in assignedShards aren't shut down // verify shard consumers present in assignedShards aren't shut down
Assert.assertFalse(consumerOfShardInfo1.isBeginShutdown()); Assert.assertFalse(consumerOfShardInfo1.isShutdownRequested());
Assert.assertFalse(consumerOfShardInfo2.isBeginShutdown()); Assert.assertFalse(consumerOfShardInfo2.isShutdownRequested());
} }
@Test @Test
@ -690,6 +699,339 @@ public class WorkerTest {
assertThat(recordProcessorInterrupted.get(), equalTo(true)); assertThat(recordProcessorInterrupted.get(), equalTo(true));
} }
@Test
public void testRequestShutdown() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown();
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
worker.runProcessLoop();
verify(leaseCoordinator, atLeastOnce()).dropLease(eq(lease));
leases.clear();
currentAssignments.clear();
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
@Test
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
worker.requestShutdown();
leases.clear();
currentAssignments.clear();
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
worker.runProcessLoop();
verify(executorService, atLeastOnce())
.submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.ZOMBIE))));
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
@Test
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
final List<ShardInfo> currentAssignments = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
currentAssignments.add(new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(),
lease.getParentShardIds(), lease.getCheckpoint()));
when(leaseCoordinator.getAssignments()).thenAnswer(new Answer<List<KinesisClientLease>>() {
@Override
public List<KinesisClientLease> answer(InvocationOnMock invocation) throws Throwable {
return leases;
}
});
when(leaseCoordinator.getCurrentAssignments()).thenAnswer(new Answer<List<ShardInfo>>() {
@Override
public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {
return currentAssignments;
}
});
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.BLOCK_ON_PARENT_SHARDS))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.INITIALIZE))));
when(taskResult.isShardEndReached()).thenReturn(true);
worker.requestShutdown();
worker.runProcessLoop();
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN_NOTIFICATION))));
verify(executorService).submit(argThat(ShutdownReasonMatcher.hasReason(equalTo(ShutdownReason.TERMINATE))));
worker.runProcessLoop();
verify(executorService, atLeastOnce()).submit(argThat(
both(isA(MetricsCollectingTaskDecorator.class)).and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN))));
}
private static class ShutdownReasonMatcher extends TypeSafeDiagnosingMatcher<MetricsCollectingTaskDecorator> {
private final Matcher<ShutdownReason> matcher;
public ShutdownReasonMatcher(Matcher<ShutdownReason> matcher) {
this.matcher = matcher;
}
@Override
public void describeTo(Description description) {
description.appendText("hasShutdownReason(").appendDescriptionOf(matcher).appendText(")");
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) {
return Condition.matched(item, mismatchDescription)
.and(new Condition.Step<MetricsCollectingTaskDecorator, ShutdownTask>() {
@Override
public Condition<ShutdownTask> apply(MetricsCollectingTaskDecorator value,
Description mismatch) {
if (!(value.getOther() instanceof ShutdownTask)) {
mismatch.appendText("Wrapped task isn't a shutdown task");
return Condition.notMatched();
}
return Condition.matched((ShutdownTask) value.getOther(), mismatch);
}
}).and(new Condition.Step<ShutdownTask, ShutdownReason>() {
@Override
public Condition<ShutdownReason> apply(ShutdownTask value, Description mismatch) {
return Condition.matched(value.getReason(), mismatch);
}
}).matching(matcher);
}
public static ShutdownReasonMatcher hasReason(Matcher<ShutdownReason> matcher) {
return new ShutdownReasonMatcher(matcher);
}
}
private static class ShutdownHandlingAnswer implements Answer<Future<TaskResult>> {
final Future<TaskResult> defaultFuture;
public ShutdownHandlingAnswer(Future<TaskResult> defaultFuture) {
this.defaultFuture = defaultFuture;
}
@Override
public Future<TaskResult> answer(InvocationOnMock invocation) throws Throwable {
ITask rootTask = (ITask) invocation.getArguments()[0];
if (rootTask instanceof MetricsCollectingTaskDecorator
&& ((MetricsCollectingTaskDecorator) rootTask).getOther() instanceof ShutdownNotificationTask) {
ShutdownNotificationTask task = (ShutdownNotificationTask) ((MetricsCollectingTaskDecorator) rootTask).getOther();
return Futures.immediateFuture(task.call());
}
return defaultFuture;
}
}
private static class TaskTypeMatcher extends TypeSafeMatcher<MetricsCollectingTaskDecorator> {
final Matcher<TaskType> expectedTaskType;
TaskTypeMatcher(TaskType expectedTaskType) {
this(equalTo(expectedTaskType));
}
TaskTypeMatcher(Matcher<TaskType> expectedTaskType) {
this.expectedTaskType = expectedTaskType;
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item) {
return expectedTaskType.matches(item.getTaskType());
}
@Override
public void describeTo(Description description) {
description.appendText("taskType matches");
expectedTaskType.describeTo(description);
}
static TaskTypeMatcher isOfType(TaskType taskType) {
return new TaskTypeMatcher(taskType);
}
static TaskTypeMatcher matchesType(Matcher<TaskType> matcher) {
return new TaskTypeMatcher(matcher);
}
}
private static class InnerTaskMatcher<T extends ITask> extends TypeSafeMatcher<MetricsCollectingTaskDecorator> {
final Matcher<T> matcher;
InnerTaskMatcher(Matcher<T> matcher) {
this.matcher = matcher;
}
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item) {
return matcher.matches(item.getOther());
}
@Override
public void describeTo(Description description) {
matcher.describeTo(description);
}
static <U extends ITask> InnerTaskMatcher<U> taskWith(Class<U> clazz, Matcher<U> matcher) {
return new InnerTaskMatcher<>(matcher);
}
}
/** /**
* Returns executor service that will be owned by the worker. This is useful to test the scenario * Returns executor service that will be owned by the worker. This is useful to test the scenario
* where worker shuts down the executor service also during shutdown flow. * where worker shuts down the executor service also during shutdown flow.
@ -697,7 +1039,8 @@ public class WorkerTest {
* @return Executor service that will be owned by the worker. * @return Executor service that will be owned by the worker.
*/ */
private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() { private WorkerThreadPoolExecutor getWorkerThreadPoolExecutor() {
return new WorkerThreadPoolExecutor(); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
return new WorkerThreadPoolExecutor(threadFactory);
} }
private List<Shard> createShardListWithOneShard() { private List<Shard> createShardListWithOneShard() {

View file

@ -1,18 +1,32 @@
package com.amazonaws.services.kinesis.clientlibrary.types; package com.amazonaws.services.kinesis.clientlibrary.types;
import org.junit.Assert; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.junit.Test; import org.junit.Test;
/** /**
* Unit tests of ShutdownReason enum class. * Unit tests of ShutdownReason enum class.
*/ */
public class ShutdownReasonTest { public class ShutdownReasonTest {
@Test @Test
public void testToString() { public void testTransitionZombie() {
Assert.assertEquals("ZOMBIE", String.valueOf(ShutdownReason.ZOMBIE)); assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.TERMINATE), equalTo(false));
Assert.assertEquals("TERMINATE", String.valueOf(ShutdownReason.TERMINATE)); assertThat(ShutdownReason.ZOMBIE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false));
Assert.assertEquals("ZOMBIE", ShutdownReason.ZOMBIE.toString()); }
Assert.assertEquals("TERMINATE", ShutdownReason.TERMINATE.toString());
@Test
public void testTransitionTerminate() {
assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true));
assertThat(ShutdownReason.TERMINATE.canTransitionTo(ShutdownReason.REQUESTED), equalTo(false));
}
@Test
public void testTransitionRequested() {
assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.ZOMBIE), equalTo(true));
assertThat(ShutdownReason.REQUESTED.canTransitionTo(ShutdownReason.TERMINATE), equalTo(true));
} }
} }

View file

@ -0,0 +1,63 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
public class KinesisClientLeaseBuilder {
private String leaseKey;
private String leaseOwner;
private Long leaseCounter = 0L;
private UUID concurrencyToken;
private Long lastCounterIncrementNanos;
private ExtendedSequenceNumber checkpoint;
private Long ownerSwitchesSinceCheckpoint = 0L;
private Set<String> parentShardIds = new HashSet<>();
public KinesisClientLeaseBuilder withLeaseKey(String leaseKey) {
this.leaseKey = leaseKey;
return this;
}
public KinesisClientLeaseBuilder withLeaseOwner(String leaseOwner) {
this.leaseOwner = leaseOwner;
return this;
}
public KinesisClientLeaseBuilder withLeaseCounter(Long leaseCounter) {
this.leaseCounter = leaseCounter;
return this;
}
public KinesisClientLeaseBuilder withConcurrencyToken(UUID concurrencyToken) {
this.concurrencyToken = concurrencyToken;
return this;
}
public KinesisClientLeaseBuilder withLastCounterIncrementNanos(Long lastCounterIncrementNanos) {
this.lastCounterIncrementNanos = lastCounterIncrementNanos;
return this;
}
public KinesisClientLeaseBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) {
this.checkpoint = checkpoint;
return this;
}
public KinesisClientLeaseBuilder withOwnerSwitchesSinceCheckpoint(Long ownerSwitchesSinceCheckpoint) {
this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
return this;
}
public KinesisClientLeaseBuilder withParentShardIds(Set<String> parentShardIds) {
this.parentShardIds = parentShardIds;
return this;
}
public KinesisClientLease build() {
return new KinesisClientLease(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos,
checkpoint, ownerSwitchesSinceCheckpoint, parentShardIds);
}
}

View file

@ -28,7 +28,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;

View file

@ -33,7 +33,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage; import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message; import com.amazonaws.services.kinesis.multilang.messages.Message;

View file

@ -36,7 +36,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibD
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage; import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage; import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;

View file

@ -14,14 +14,13 @@
*/ */
package com.amazonaws.services.kinesis.multilang.messages; package com.amazonaws.services.kinesis.multilang.messages;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;