diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java
index 481a5dbf..89cf092a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessor.java
@@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.interfaces;
import java.util.List;
import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* The Amazon Kinesis Client Library will instantiate record processors to process data records fetched from Amazon
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java
new file mode 100644
index 00000000..82a18a0e
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java
@@ -0,0 +1,19 @@
+package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+
+/**
+ * Allows a record processor to indicate it's aware of requested shutdowns, and handle the request.
+ */
+public interface IShutdownNotificationAware {
+
+ /**
+ * Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint.
+ *
+ * The record processor will still have shutdown called.
+ *
+ * @param checkpointer the checkpointer that can be used to save progress.
+ */
+ void shutdownRequested(IRecordProcessorCheckpointer checkpointer);
+
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
new file mode 100644
index 00000000..c8678974
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java
@@ -0,0 +1,602 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+/**
+ * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
+ * and state transitions is contained within the {@link ConsumerState} objects.
+ *
+ *
+ */
+class ConsumerStates {
+
+ /**
+ * Enumerates processing states when working on a shard.
+ */
+ enum ShardConsumerState {
+ // @formatter:off
+ WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()),
+ INITIALIZING(new InitializingState()),
+ PROCESSING(new ProcessingState()),
+ SHUTDOWN_REQUESTED(new ShutdownNotificationState()),
+ SHUTTING_DOWN(new ShuttingDownState()),
+ SHUTDOWN_COMPLETE(new ShutdownCompleteState());
+ //@formatter:on
+
+ private final ConsumerState consumerState;
+
+ ShardConsumerState(ConsumerState consumerState) {
+ this.consumerState = consumerState;
+ }
+
+ public ConsumerState getConsumerState() {
+ return consumerState;
+ }
+ }
+
+
+ /**
+ * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
+ * do when a transition occurs.
+ *
+ */
+ interface ConsumerState {
+ /**
+ * Creates a new task for this state using the passed in consumer to build the task. If there is no task
+ * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
+ * consumer during the execution of this method.
+ *
+ * @param consumer
+ * the consumer to use build the task, or execute state.
+ * @return a valid task for this state or null if there is no task required.
+ */
+ ITask createTask(ShardConsumer consumer);
+
+ /**
+ * Provides the next state of the consumer upon success of the task return by
+ * {@link ConsumerState#createTask(ShardConsumer)}.
+ *
+ * @return the next state that the consumer should transition to, this may be the same object as the current
+ * state.
+ */
+ ConsumerState successTransition();
+
+ /**
+ * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
+ * on the current state, and the shutdown reason.
+ *
+ * @param shutdownReason
+ * the reason that a shutdown was requested
+ * @return the next state that the consumer should transition to, this may be the same object as the current
+ * state.
+ */
+ ConsumerState shutdownTransition(ShutdownReason shutdownReason);
+
+ /**
+ * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
+ * even if createTask would return a null value.
+ *
+ * @return the type of task that this state represents.
+ */
+ TaskType getTaskType();
+
+ /**
+ * An enumeration represent the type of this state. Different consumer states may return the same
+ * {@link ShardConsumerState}.
+ *
+ * @return the type of consumer state this represents.
+ */
+ ShardConsumerState getState();
+
+ boolean isTerminal();
+
+ }
+
+ /**
+ * The initial state that any {@link ShardConsumer} should start in.
+ */
+ static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
+
+ private static ConsumerState shutdownStateFor(ShutdownReason reason) {
+ switch (reason) {
+ case REQUESTED:
+ return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
+ case TERMINATE:
+ case ZOMBIE:
+ return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
+ default:
+ throw new IllegalArgumentException("Unknown reason: " + reason);
+ }
+ }
+
+ /**
+ * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
+ * shards have been completed.
+ *
+ *
Valid Transitions
+ *
+ *
Success
+ *
Transition to the initializing state to allow the record processor to be initialized in preparation of
+ * processing.
+ *
Shutdown
+ *
+ *
+ *
All Reasons
+ *
Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be
+ * informed of the shutdown.
+ *
+ *
+ *
+ */
+ static class BlockedOnParentState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(),
+ consumer.getParentShardPollIntervalMillis());
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return ShardConsumerState.INITIALIZING.getConsumerState();
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.BLOCK_ON_PARENT_SHARDS;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.WAITING_ON_PARENT_SHARDS;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * This state is responsible for initializing the record processor with the shard information.
+ *
Valid Transitions
+ *
+ *
Success
+ *
Transitions to the processing state which will begin to send records to the record processor
+ *
Shutdown
+ *
At this point the record processor has been initialized, but hasn't processed any records. This requires that
+ * the record processor be notified of the shutdown, even though there is almost no actions the record processor
+ * could take.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
+ *
+ * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never
+ * makes an requests to Kinesis for records, so it can't reach the end of a shard.
+ *
+ *
+ * Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ *
+ *
+ */
+ static class InitializingState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(),
+ consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
+ consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig());
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return ShardConsumerState.PROCESSING.getConsumerState();
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ return shutdownReason.getShutdownState();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.INITIALIZE;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.INITIALIZING;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor.
+ * While in this state the only way a transition will occur is if a shutdown has been triggered.
+ *
Valid Transitions
+ *
+ *
Success
+ *
Doesn't actually transition, but instead returns the same state
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end
+ * of the shard triggering a {@link ShutdownReason#TERMINATE}.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */
+ static class ProcessingState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
+ consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
+ consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist());
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return ShardConsumerState.PROCESSING.getConsumerState();
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ return shutdownReason.getShutdownState();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.PROCESS;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.PROCESSING;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState();
+
+ /**
+ * This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a
+ * chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a
+ * shutdown on the {@link InitializingState} or {@link ProcessingState}.
+ *
+ *
Valid Transitions
+ *
+ *
Success
+ *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. An explicit shutdown will allow the record
+ * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to
+ * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */
+ static class ShutdownNotificationState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
+ consumer.getShutdownNotification());
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return SHUTDOWN_REQUEST_COMPLETION_STATE;
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ if (shutdownReason == ShutdownReason.REQUESTED) {
+ return SHUTDOWN_REQUEST_COMPLETION_STATE;
+ }
+ return shutdownReason.getShutdownState();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.SHUTDOWN_NOTIFICATION;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.SHUTDOWN_REQUESTED;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the
+ * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state.
+ *
+ *
Valid Transitions
+ *
+ *
Success
+ *
+ *
+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
+ *
+ * Remains in the {@link ShutdownNotificationCompletionState}
+ *
+ *
+ *
Shutdown
+ *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is
+ * waiting that notification. While waiting for the notification no further processing should occur on the
+ * {@link ShardConsumer}.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains
+ * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */
+ static class ShutdownNotificationCompletionState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return null;
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return this;
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ if (shutdownReason != ShutdownReason.REQUESTED) {
+ return shutdownReason.getShutdownState();
+ }
+ return this;
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.SHUTDOWN_NOTIFICATION;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.SHUTDOWN_REQUESTED;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard.
+ *
+ *
Valid Transitions
+ *
+ *
Success
+ *
+ *
+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
+ *
+ * Transitions to the {@link ShutdownCompleteState}
+ *
+ *
+ *
Shutdown
+ *
At this point the record processor has processed the final shutdown indication, and depending on the shutdown
+ * reason taken the correct course of action. From this point on there should be no more interactions with the
+ * record processor or {@link ShardConsumer}.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *
+ * This should not occur as all other {@link ShutdownReason}s take priority over it.
+ *
+ *
+ * Transitions to {@link ShutdownCompleteState}
+ *
+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */
+ static class ShuttingDownState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(),
+ consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(),
+ consumer.getStreamConfig().getStreamProxy(),
+ consumer.getStreamConfig().getInitialPositionInStream(),
+ consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
+ consumer.getTaskBackoffTimeMillis());
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.SHUTDOWN;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.SHUTTING_DOWN;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed.
+ *
+ *
Valid Transitions
+ *
+ *
Success
+ *
+ *
+ * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
+ *
+ * Remains in the {@link ShutdownCompleteState}
+ *
+ *
+ *
Shutdown
+ *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any
+ * further actions.
+ *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *
+ * This should not occur as all other {@link ShutdownReason}s take priority over it.
+ *
+ *
+ * Remains in {@link ShutdownCompleteState}
+ *
+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */
+ static class ShutdownCompleteState implements ConsumerState {
+
+ @Override
+ public ITask createTask(ShardConsumer consumer) {
+ if (consumer.getShutdownNotification() != null) {
+ consumer.getShutdownNotification().shutdownComplete();
+ }
+ return null;
+ }
+
+ @Override
+ public ConsumerState successTransition() {
+ return this;
+ }
+
+ @Override
+ public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
+ return this;
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.SHUTDOWN_COMPLETE;
+ }
+
+ @Override
+ public ShardConsumerState getState() {
+ return ShardConsumerState.SHUTDOWN_COMPLETE;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return true;
+ }
+ }
+
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
index 0119dcc3..59de31be 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java
@@ -14,8 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -33,8 +34,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
-import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
@@ -200,23 +201,29 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getCurrentAssignments() {
- List assignments = new LinkedList();
Collection leases = getAssignments();
- if ((leases != null) && (!leases.isEmpty())) {
- for (KinesisClientLease lease : leases) {
- Set parentShardIds = lease.getParentShardIds();
- ShardInfo assignment =
- new ShardInfo(
- lease.getLeaseKey(),
- lease.getConcurrencyToken().toString(),
- parentShardIds,
- lease.getCheckpoint());
- assignments.add(assignment);
- }
+ return convertLeasesToAssignments(leases);
+
+ }
+
+ public static List convertLeasesToAssignments(Collection leases) {
+ if (leases == null || leases.isEmpty()) {
+ return Collections.emptyList();
}
+ List assignments = new ArrayList<>(leases.size());
+ for (KinesisClientLease lease : leases) {
+ assignments.add(convertLeaseToAssignment(lease));
+ }
+
return assignments;
}
+ public static ShardInfo convertLeaseToAssignment(KinesisClientLease lease) {
+ Set parentShardIds = lease.getParentShardIds();
+ return new ShardInfo(lease.getLeaseKey(), lease.getConcurrencyToken().toString(), parentShardIds,
+ lease.getCheckpoint());
+ }
+
/**
* Initialize the lease coordinator (create the lease table if needed).
* @throws DependencyException
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java
index 615467a0..e61da491 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java
@@ -63,4 +63,12 @@ class MetricsCollectingTaskDecorator implements ITask {
return other.getTaskType();
}
+ @Override
+ public String toString() {
+ return this.getClass().getName() + "<" + other.getTaskType() + ">(" + other + ")";
+ }
+
+ ITask getOther() {
+ return other;
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java
index 16daafc2..69922670 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java
@@ -69,7 +69,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*/
@Override
public synchronized void checkpoint()
- throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
+ throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
if (LOG.isDebugEnabled()) {
LOG.debug("Checkpointing " + shardInfo.getShardId() + ", " + " token " + shardInfo.getConcurrencyToken()
+ " at largest permitted value " + this.largestPermittedCheckpointValue);
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index b9e8d2df..63cce40d 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -24,10 +25,10 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
+import com.google.common.annotations.VisibleForTesting;
/**
* Responsible for consuming data records of a (specified) shard.
@@ -36,13 +37,6 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
*/
class ShardConsumer {
- /**
- * Enumerates processing states when working on a shard.
- */
- enum ShardConsumerState {
- WAITING_ON_PARENT_SHARDS, INITIALIZING, PROCESSING, SHUTTING_DOWN, SHUTDOWN_COMPLETE;
- }
-
private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
private final StreamConfig streamConfig;
@@ -68,13 +62,13 @@ class ShardConsumer {
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates.
*/
- private ShardConsumerState currentState = ShardConsumerState.WAITING_ON_PARENT_SHARDS;
+ private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE;
/*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
*/
- private volatile boolean beginShutdown;
private volatile ShutdownReason shutdownReason;
+ private volatile ShutdownNotification shutdownNotification;
/**
* @param shardInfo Shard information
@@ -129,41 +123,19 @@ class ShardConsumer {
return checkAndSubmitNextTask();
}
- // CHECKSTYLE:OFF CyclomaticComplexity
+ private boolean readyForNextTask() {
+ return future == null || future.isCancelled() || future.isDone();
+ }
+
private synchronized boolean checkAndSubmitNextTask() {
- // Task completed successfully (without exceptions)
- boolean taskCompletedSuccessfully = false;
boolean submittedNewTask = false;
- if ((future == null) || future.isCancelled() || future.isDone()) {
- if ((future != null) && future.isDone()) {
- try {
- TaskResult result = future.get();
- if (result.getException() == null) {
- taskCompletedSuccessfully = true;
- if (result.isShardEndReached()) {
- markForShutdown(ShutdownReason.TERMINATE);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- Exception taskException = result.getException();
- if (taskException instanceof BlockedOnParentShardException) {
- // No need to log the stack trace for this exception (it is very specific).
- LOG.debug("Shard " + shardInfo.getShardId()
- + " is blocked on completion of parent shard.");
- } else {
- LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
- result.getException());
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- // Setting future to null so we don't misinterpret task completion status in case of exceptions
- future = null;
- }
+ if (readyForNextTask()) {
+ TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE;
+ if (future != null && future.isDone()) {
+ taskOutcome = determineTaskOutcome();
}
- updateState(taskCompletedSuccessfully);
+
+ updateState(taskOutcome);
ITask nextTask = getNextTask();
if (nextTask != null) {
currentTask = nextTask;
@@ -196,7 +168,56 @@ class ShardConsumer {
return submittedNewTask;
}
- // CHECKSTYLE:ON CyclomaticComplexity
+ public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
+ return skipShardSyncAtWorkerInitializationIfLeasesExist;
+ }
+
+ private enum TaskOutcome {
+ SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
+ }
+
+ private TaskOutcome determineTaskOutcome() {
+ try {
+ TaskResult result = future.get();
+ if (result.getException() == null) {
+ if (result.isShardEndReached()) {
+ return TaskOutcome.END_OF_SHARD;
+ }
+ return TaskOutcome.SUCCESSFUL;
+ }
+ logTaskException(result);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ // Setting future to null so we don't misinterpret task completion status in case of exceptions
+ future = null;
+ }
+ return TaskOutcome.FAILURE;
+ }
+
+ private void logTaskException(TaskResult taskResult) {
+ if (LOG.isDebugEnabled()) {
+ Exception taskException = taskResult.getException();
+ if (taskException instanceof BlockedOnParentShardException) {
+ // No need to log the stack trace for this exception (it is very specific).
+ LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard.");
+ } else {
+ LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
+ taskResult.getException());
+ }
+ }
+ }
+
+ /**
+ * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
+ * before being shutdown.
+ *
+ * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
+ */
+ void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
+ this.shutdownNotification = shutdownNotification;
+ markForShutdown(ShutdownReason.REQUESTED);
+ }
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
@@ -205,17 +226,15 @@ class ShardConsumer {
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
- if (currentState != ShardConsumerState.SHUTDOWN_COMPLETE) {
- markForShutdown(ShutdownReason.ZOMBIE);
- checkAndSubmitNextTask();
- }
+ markForShutdown(ShutdownReason.ZOMBIE);
+ checkAndSubmitNextTask();
+
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
- beginShutdown = true;
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
- if ((shutdownReason == null) || (shutdownReason == ShutdownReason.TERMINATE)) {
+ if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
@@ -227,7 +246,7 @@ class ShardConsumer {
* @return true if shutdown is complete
*/
boolean isShutdown() {
- return currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
+ return currentState.isTerminal();
}
/**
@@ -243,48 +262,7 @@ class ShardConsumer {
* @return Return next task to run
*/
private ITask getNextTask() {
- ITask nextTask = null;
- switch (currentState) {
- case WAITING_ON_PARENT_SHARDS:
- nextTask = new BlockOnParentShardTask(shardInfo, leaseManager, parentShardPollIntervalMillis);
- break;
- case INITIALIZING:
- nextTask =
- new InitializeTask(shardInfo,
- recordProcessor,
- checkpoint,
- recordProcessorCheckpointer,
- dataFetcher,
- taskBackoffTimeMillis,
- streamConfig);
- break;
- case PROCESSING:
- nextTask =
- new ProcessTask(shardInfo,
- streamConfig,
- recordProcessor,
- recordProcessorCheckpointer,
- dataFetcher,
- taskBackoffTimeMillis,
- skipShardSyncAtWorkerInitializationIfLeasesExist);
- break;
- case SHUTTING_DOWN:
- nextTask =
- new ShutdownTask(shardInfo,
- recordProcessor,
- recordProcessorCheckpointer,
- shutdownReason,
- streamConfig.getStreamProxy(),
- streamConfig.getInitialPositionInStream(),
- cleanupLeasesOfCompletedShards,
- leaseManager,
- taskBackoffTimeMillis);
- break;
- case SHUTDOWN_COMPLETE:
- break;
- default:
- break;
- }
+ ITask nextTask = currentState.createTask(this);
if (nextTask == null) {
return null;
@@ -297,71 +275,93 @@ class ShardConsumer {
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
- * @param taskCompletedSuccessfully Whether (current) task completed successfully.
+ * @param taskOutcome The outcome of the last task
*/
- // CHECKSTYLE:OFF CyclomaticComplexity
- void updateState(boolean taskCompletedSuccessfully) {
- if (currentState == ShardConsumerState.SHUTDOWN_COMPLETE) {
- // Shutdown was completed and there nothing we can do after that
- return;
+ void updateState(TaskOutcome taskOutcome) {
+ if (taskOutcome == TaskOutcome.END_OF_SHARD) {
+ markForShutdown(ShutdownReason.TERMINATE);
}
- if ((currentTask == null) && beginShutdown) {
- // Shard didn't start any tasks and can be shutdown fast
- currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
- return;
- }
- if (beginShutdown && currentState != ShardConsumerState.SHUTTING_DOWN) {
- // Shard received signal to start shutdown.
- // Whatever task we were working on should be stopped and shutdown task should be executed
- currentState = ShardConsumerState.SHUTTING_DOWN;
- return;
- }
- switch (currentState) {
- case WAITING_ON_PARENT_SHARDS:
- if (taskCompletedSuccessfully && TaskType.BLOCK_ON_PARENT_SHARDS.equals(currentTask.getTaskType())) {
- currentState = ShardConsumerState.INITIALIZING;
- }
- break;
- case INITIALIZING:
- if (taskCompletedSuccessfully && TaskType.INITIALIZE.equals(currentTask.getTaskType())) {
- currentState = ShardConsumerState.PROCESSING;
- }
- break;
- case PROCESSING:
- if (taskCompletedSuccessfully && TaskType.PROCESS.equals(currentTask.getTaskType())) {
- currentState = ShardConsumerState.PROCESSING;
- }
- break;
- case SHUTTING_DOWN:
- if (currentTask == null
- || (taskCompletedSuccessfully && TaskType.SHUTDOWN.equals(currentTask.getTaskType()))) {
- currentState = ShardConsumerState.SHUTDOWN_COMPLETE;
- }
- break;
- default:
- LOG.error("Unexpected state: " + currentState);
- break;
+ if (isShutdownRequested()) {
+ currentState = currentState.shutdownTransition(shutdownReason);
+ } else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
+ if (currentState.getTaskType() == currentTask.getTaskType()) {
+ currentState = currentState.successTransition();
+ } else {
+ LOG.error("Current State task type of '" + currentState.getTaskType()
+ + "' doesn't match the current tasks type of '" + currentTask.getTaskType()
+ + "'. This shouldn't happen, and indicates a programming error. "
+ + "Unable to safely transition to the next state.");
+ }
}
+ //
+ // Don't change state otherwise
+ //
+
}
- // CHECKSTYLE:ON CyclomaticComplexity
+ @VisibleForTesting
+ boolean isShutdownRequested() {
+ return shutdownReason != null;
+ }
/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the currentState
*/
- ShardConsumerState getCurrentState() {
- return currentState;
+ ConsumerStates.ShardConsumerState getCurrentState() {
+ return currentState.getState();
}
- /**
- * Private/Internal method - has package level access solely for testing purposes.
- *
- * @return the beginShutdown
- */
- boolean isBeginShutdown() {
- return beginShutdown;
+ StreamConfig getStreamConfig() {
+ return streamConfig;
}
+ IRecordProcessor getRecordProcessor() {
+ return recordProcessor;
+ }
+
+ RecordProcessorCheckpointer getRecordProcessorCheckpointer() {
+ return recordProcessorCheckpointer;
+ }
+
+ ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ ShardInfo getShardInfo() {
+ return shardInfo;
+ }
+
+ KinesisDataFetcher getDataFetcher() {
+ return dataFetcher;
+ }
+
+ ILeaseManager getLeaseManager() {
+ return leaseManager;
+ }
+
+ ICheckpoint getCheckpoint() {
+ return checkpoint;
+ }
+
+ long getParentShardPollIntervalMillis() {
+ return parentShardPollIntervalMillis;
+ }
+
+ boolean isCleanupLeasesOfCompletedShards() {
+ return cleanupLeasesOfCompletedShards;
+ }
+
+ long getTaskBackoffTimeMillis() {
+ return taskBackoffTimeMillis;
+ }
+
+ Future getFuture() {
+ return future;
+ }
+
+ ShutdownNotification getShutdownNotification() {
+ return shutdownNotification;
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java
new file mode 100644
index 00000000..b3792131
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java
@@ -0,0 +1,71 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
+
+/**
+ * Contains callbacks for completion of stages in a requested record processor shutdown.
+ *
+ */
+class ShardConsumerShutdownNotification implements ShutdownNotification {
+
+ private final LeaseCoordinator leaseCoordinator;
+ private final KinesisClientLease lease;
+ private final CountDownLatch shutdownCompleteLatch;
+ private final CountDownLatch notificationCompleteLatch;
+
+ private boolean notificationComplete = false;
+ private boolean allNotificationCompleted = false;
+
+ /**
+ * Creates a new shutdown request object.
+ *
+ * @param leaseCoordinator
+ * the lease coordinator used to drop leases from once the initial shutdown request is completed.
+ * @param lease
+ * the lease that this shutdown request will free once initial shutdown is complete
+ * @param notificationCompleteLatch
+ * used to inform the caller once the
+ * {@link IShutdownNotificationAware} object has been
+ * notified of the shutdown request.
+ * @param shutdownCompleteLatch
+ * used to inform the caller once the record processor is fully shutdown
+ */
+ ShardConsumerShutdownNotification(LeaseCoordinator leaseCoordinator, KinesisClientLease lease,
+ CountDownLatch notificationCompleteLatch, CountDownLatch shutdownCompleteLatch) {
+ this.leaseCoordinator = leaseCoordinator;
+ this.lease = lease;
+ this.notificationCompleteLatch = notificationCompleteLatch;
+ this.shutdownCompleteLatch = shutdownCompleteLatch;
+ }
+
+ @Override
+ public void shutdownNotificationComplete() {
+ if (notificationComplete) {
+ return;
+ }
+ //
+ // Once the notification has been completed, the lease needs to dropped to allow the worker to complete
+ // shutdown of the record processor.
+ //
+ leaseCoordinator.dropLease(lease);
+ notificationCompleteLatch.countDown();
+ notificationComplete = true;
+ }
+
+ @Override
+ public void shutdownComplete() {
+ if (allNotificationCompleted) {
+ return;
+ }
+ if (!notificationComplete) {
+ notificationCompleteLatch.countDown();
+ }
+ shutdownCompleteLatch.countDown();
+ allNotificationCompleted = true;
+ }
+
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java
new file mode 100644
index 00000000..8ee96537
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java
@@ -0,0 +1,155 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
+ */
+class ShutdownFuture implements Future {
+
+ private static final Log log = LogFactory.getLog(ShutdownFuture.class);
+
+ private final CountDownLatch shutdownCompleteLatch;
+ private final CountDownLatch notificationCompleteLatch;
+ private final Worker worker;
+
+ ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
+ this.shutdownCompleteLatch = shutdownCompleteLatch;
+ this.notificationCompleteLatch = notificationCompleteLatch;
+ this.worker = worker;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException("Cannot cancel a shutdown process");
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return isWorkerShutdownComplete();
+ }
+
+ private boolean isWorkerShutdownComplete() {
+ return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
+ }
+
+ private long outstandingRecordProcessors(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+
+ final long startNanos = System.nanoTime();
+
+ //
+ // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
+ // There is the possibility of a race condition where a lease is terminated after the shutdown request
+ // notification is started, but before the ShardConsumer is sent the notification. In this case the
+ // ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
+ //
+ if (!notificationCompleteLatch.await(timeout, unit)) {
+ long awaitingNotification = notificationCompleteLatch.getCount();
+ long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
+ log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
+ + awaitingFinalShutdown + " awaiting final shutdown");
+ if (awaitingFinalShutdown != 0) {
+ //
+ // The number of record processor awaiting final shutdown should be a superset of the those awaiting
+ // notification
+ //
+ return checkWorkerShutdownMiss(awaitingFinalShutdown);
+ }
+ }
+
+ long remaining = remainingTimeout(timeout, unit, startNanos);
+ throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
+
+ //
+ // Once all record processors have been notified of the shutdown it is safe to allow the worker to
+ // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
+ //
+ worker.shutdown();
+ remaining = remainingTimeout(timeout, unit, startNanos);
+ throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
+
+ //
+ // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
+ // processing. This should really be a no-op since as part of the notification completion the lease for
+ // ShardConsumer is terminated.
+ //
+ if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
+ long outstanding = shutdownCompleteLatch.getCount();
+ log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
+
+ return checkWorkerShutdownMiss(outstanding);
+ }
+ return 0;
+ }
+
+ private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
+ long checkNanos = System.nanoTime() - startNanos;
+ return unit.toNanos(timeout) - checkNanos;
+ }
+
+ private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
+ if (remainingNanos <= 0) {
+ throw new TimeoutException(message);
+ }
+ }
+
+ /**
+ * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
+ * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
+ * latch should be decremented before the shutdown completion.
+ *
+ * @param outstanding
+ * the number of record processor still awaiting shutdown.
+ * @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
+ */
+ private long checkWorkerShutdownMiss(long outstanding) {
+ if (isWorkerShutdownComplete()) {
+ if (outstanding != 0) {
+ log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ + worker.isShutdownComplete() + " -- Consumer Map: "
+ + worker.getShardInfoShardConsumerMap().size());
+ }
+ return 0;
+ }
+ return outstanding;
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ boolean complete = false;
+ do {
+ try {
+ long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
+ complete = outstanding == 0;
+ log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
+ } catch (TimeoutException te) {
+ log.info("Timeout while waiting for completion: " + te.getMessage());
+ }
+
+ } while(!complete);
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long outstanding = outstandingRecordProcessors(timeout, unit);
+ if (outstanding != 0) {
+ throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java
new file mode 100644
index 00000000..928e6900
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotification.java
@@ -0,0 +1,22 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+
+/**
+ * A shutdown request to the ShardConsumer
+ */
+public interface ShutdownNotification {
+ /**
+ * Used to indicate that the record processor has been notified of a requested shutdown, and given the chance to
+ * checkpoint.
+ *
+ */
+ void shutdownNotificationComplete();
+
+ /**
+ * Used to indicate that the record processor has completed the call to
+ * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)} has
+ * completed.
+ */
+ void shutdownComplete();
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java
new file mode 100644
index 00000000..13711f23
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java
@@ -0,0 +1,43 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
+
+/**
+ * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
+ */
+class ShutdownNotificationTask implements ITask {
+
+ private final IRecordProcessor recordProcessor;
+ private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
+ private final ShutdownNotification shutdownNotification;
+
+ ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
+ this.recordProcessor = recordProcessor;
+ this.recordProcessorCheckpointer = recordProcessorCheckpointer;
+ this.shutdownNotification = shutdownNotification;
+ }
+
+ @Override
+ public TaskResult call() {
+ try {
+ if (recordProcessor instanceof IShutdownNotificationAware) {
+ IShutdownNotificationAware shutdownNotificationAware = (IShutdownNotificationAware) recordProcessor;
+ try {
+ shutdownNotificationAware.shutdownRequested(recordProcessorCheckpointer);
+ } catch (Exception ex) {
+ return new TaskResult(ex);
+ }
+ }
+ return new TaskResult(null);
+ } finally {
+ shutdownNotification.shutdownNotificationComplete();
+ }
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.SHUTDOWN_NOTIFICATION;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java
similarity index 52%
rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java
rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java
index 473b488c..8d0dfc80 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownReason.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java
@@ -12,7 +12,12 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-package com.amazonaws.services.kinesis.clientlibrary.types;
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
+
/**
* Reason the RecordProcessor is being shutdown.
@@ -28,7 +33,7 @@ public enum ShutdownReason {
* Applications SHOULD NOT checkpoint their progress (as another record processor may have already started
* processing data).
*/
- ZOMBIE,
+ ZOMBIE(3, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
/**
* Terminate processing for this RecordProcessor (resharding use case).
@@ -36,5 +41,38 @@ public enum ShutdownReason {
* Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records
* from this shard and processing of child shards can be started.
*/
- TERMINATE
+ TERMINATE(2, ShardConsumerState.SHUTTING_DOWN.getConsumerState()),
+
+ /**
+ * Indicates that the entire application is being shutdown, and if desired the record processor will be given a
+ * final chance to checkpoint. This state will not trigger a direct call to
+ * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but
+ * instead depend on a different interface for backward compatibility.
+ */
+ REQUESTED(1, ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState());
+
+ private final int rank;
+ private final ConsumerState shutdownState;
+
+ ShutdownReason(int rank, ConsumerState shutdownState) {
+ this.rank = rank;
+ this.shutdownState = shutdownState;
+ }
+
+ /**
+ * Indicates whether the given reason can override the current reason.
+ *
+ * @param reason the reason to transition to
+ * @return true if the transition is allowed, false if it's not.
+ */
+ public boolean canTransitionTo(ShutdownReason reason) {
+ if (reason == null) {
+ return false;
+ }
+ return reason.rank > this.rank;
+ }
+
+ ConsumerState getShutdownState() {
+ return shutdownState;
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
index 3ce05203..d40fbb0e 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java
@@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
+import com.google.common.annotations.VisibleForTesting;
/**
* Task for invoking the RecordProcessor shutdown() callback.
@@ -155,4 +155,9 @@ class ShutdownTask implements ITask {
return taskType;
}
+ @VisibleForTesting
+ ShutdownReason getReason() {
+ return reason;
+ }
+
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java
index 426162d4..32fd1cd2 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskType.java
@@ -17,7 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Enumerates types of tasks executed as part of processing a shard.
*/
-enum TaskType {
+public enum TaskType {
/**
* Polls and waits until parent shard(s) have been fully processed.
*/
@@ -34,8 +34,16 @@ enum TaskType {
* Shutdown of RecordProcessor.
*/
SHUTDOWN,
+ /**
+ * Graceful shutdown has been requested, and notification of the record processor will occur.
+ */
+ SHUTDOWN_NOTIFICATION,
+ /**
+ * Occurs once the shutdown has been completed
+ */
+ SHUTDOWN_COMPLETE,
/**
* Sync leases/activities corresponding to Kinesis shards.
*/
- SHARDSYNC;
+ SHARDSYNC
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index b644a790..2a1e5484 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -14,13 +14,17 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -38,15 +42,18 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Worker is the high level class that Kinesis applications use to start
@@ -85,6 +92,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;
+ private volatile boolean shutdownComplete = false;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@@ -499,11 +507,89 @@ public class Worker implements Runnable {
}
/**
- * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that
- * if executor services were passed to the worker by the user, worker will not attempt to shutdown
- * those resources.
+ * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware},
+ * of the impending shutdown. This gives the record processor a final chance to checkpoint.
+ *
+ * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
+ * lost after requesting shutdown, but before the notification is dispatched.
+ *
+ *
Requested Shutdown Process
When a shutdown process is requested it operates slightly differently to
+ * allow the record processors a chance to checkpoint a final time.
+ *
+ *
Call to request shutdown invoked.
+ *
Worker stops attempting to acquire new leases
+ *
Record Processor Shutdown Begins
+ *
+ *
Record processor is notified of the impending shutdown, and given a final chance to checkpoint
+ *
The lease for the record processor is then dropped.
+ *
The record processor enters into an idle state waiting for the worker to complete final termination
+ *
The worker will detect a record processor that has lost it's lease, and will terminate the record processor
+ * with {@link ShutdownReason#ZOMBIE}
+ *
+ *
+ *
The worker will shutdown all record processors.
+ *
Once all record processors have been terminated, the worker will terminate all owned resources.
+ *
Once the worker shutdown is complete, the returned future is completed.
+ *
+ *
+ *
+ *
+ * @return a Future that will be set once the shutdown is complete.
+ */
+ public Future requestShutdown() {
+
+ leaseCoordinator.stopLeaseTaker();
+ //
+ // Stop accepting new leases
+ //
+ Collection leases = leaseCoordinator.getAssignments();
+ if (leases == null || leases.isEmpty()) {
+ //
+ // If there are no leases shutdown is already completed.
+ //
+ return Futures.immediateFuture(null);
+ }
+ CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
+ CountDownLatch notificationCompleteLatch = new CountDownLatch(leases.size());
+ for (KinesisClientLease lease : leases) {
+ ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
+ notificationCompleteLatch, shutdownCompleteLatch);
+ ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
+ shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
+ }
+
+ return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
+
+ }
+
+ boolean isShutdownComplete() {
+ return shutdownComplete;
+ }
+
+ ConcurrentMap getShardInfoShardConsumerMap() {
+ return shardInfoShardConsumerMap;
+ }
+
+ /**
+ * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
+ * services were passed to the worker by the user, worker will not attempt to shutdown those resources.
+ *
+ *
Shutdown Process
When called this will start shutdown of the record processor, and eventually shutdown
+ * the worker itself.
+ *
+ *
Call to start shutdown invoked
+ *
Lease coordinator told to stop taking leases, and to drop existing leases.
+ *
Worker discovers record processors that no longer have leases.
+ *
Worker triggers shutdown with state {@link ShutdownReason#ZOMBIE}.
+ *
Once all record processors are shutdown, worker terminates owned resources.
+ *
Shutdown complete.
+ *
*/
public void shutdown() {
+ if (shutdown) {
+ LOG.warn("Shutdown requested a second time.");
+ return;
+ }
LOG.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process.
@@ -530,6 +616,7 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown();
}
+ shutdownComplete = true;
}
/**
@@ -757,7 +844,8 @@ public class Worker implements Runnable {
* @return Default executor service that should be used by the worker.
*/
private static ExecutorService getExecutorService() {
- return new WorkerThreadPoolExecutor();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RecordProcessor-%04d").build();
+ return new WorkerThreadPoolExecutor(threadFactory);
}
/**
@@ -786,10 +874,10 @@ public class Worker implements Runnable {
static class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
private static final long DEFAULT_KEEP_ALIVE_TIME = 60L;
- WorkerThreadPoolExecutor() {
+ WorkerThreadPoolExecutor(ThreadFactory threadFactory) {
// Defaults are based on Executors.newCachedThreadPool()
- super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,
- new SynchronousQueue());
+ super(0, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(),
+ threadFactory);
}
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java
index 9ea2c654..c533a4da 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ShutdownInput.java
@@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.types;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* Container for the parameters to the IRecordProcessor's
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java
index b6c26d3d..b3a0ce6c 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/KinesisClientLease.java
@@ -15,9 +15,9 @@
package com.amazonaws.services.kinesis.leases.impl;
import java.util.Collection;
-
import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
@@ -41,6 +41,16 @@ public class KinesisClientLease extends Lease {
this.parentShardIds.addAll(other.getParentShardIds());
}
+ KinesisClientLease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
+ Long lastCounterIncrementNanos, ExtendedSequenceNumber checkpoint, Long ownerSwitchesSinceCheckpoint,
+ Set parentShardIds) {
+ super(leaseKey, leaseOwner, leaseCounter, concurrencyToken, lastCounterIncrementNanos);
+
+ this.checkpoint = checkpoint;
+ this.ownerSwitchesSinceCheckpoint = ownerSwitchesSinceCheckpoint;
+ this.parentShardIds.addAll(parentShardIds);
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
index 0638e4db..32234e35 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
@@ -63,11 +63,17 @@ public class Lease {
* @param lease lease to copy
*/
protected Lease(Lease lease) {
- this.leaseKey = lease.getLeaseKey();
- this.leaseOwner = lease.getLeaseOwner();
- this.leaseCounter = lease.getLeaseCounter();
- this.concurrencyToken = lease.getConcurrencyToken();
- this.lastCounterIncrementNanos = lease.getLastCounterIncrementNanos();
+ this(lease.getLeaseKey(), lease.getLeaseOwner(), lease.getLeaseCounter(), lease.getConcurrencyToken(),
+ lease.getLastCounterIncrementNanos());
+ }
+
+ protected Lease(String leaseKey, String leaseOwner, Long leaseCounter, UUID concurrencyToken,
+ Long lastCounterIncrementNanos) {
+ this.leaseKey = leaseKey;
+ this.leaseOwner = leaseOwner;
+ this.leaseCounter = leaseCounter;
+ this.concurrencyToken = concurrencyToken;
+ this.lastCounterIncrementNanos = lastCounterIncrementNanos;
}
/**
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
index 55c26674..921cfbc4 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -83,6 +84,7 @@ public class LeaseCoordinator {
private ScheduledExecutorService leaseCoordinatorThreadPool;
private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false;
+ private ScheduledFuture> takerFuture;
/**
* Constructor.
@@ -199,9 +201,15 @@ public class LeaseCoordinator {
leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
// Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
- leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
+ takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(new TakerRunnable(),
+ 0L,
+ takerIntervalMillis,
+ TimeUnit.MILLISECONDS);
// Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
- leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
+ leaseCoordinatorThreadPool.scheduleAtFixedRate(new RenewerRunnable(),
+ 0L,
+ renewerIntervalMillis,
+ TimeUnit.MILLISECONDS);
running = true;
}
@@ -309,6 +317,27 @@ public class LeaseCoordinator {
}
}
+ /**
+ * Requests the cancellation of the lease taker.
+ */
+ public void stopLeaseTaker() {
+ takerFuture.cancel(false);
+
+ }
+
+ /**
+ * Requests that renewals for the given lease are stopped.
+ *
+ * @param lease the lease to stop renewing.
+ */
+ public void dropLease(T lease) {
+ synchronized (shutdownLock) {
+ if (lease != null) {
+ leaseRenewer.dropLease(lease);
+ }
+ }
+ }
+
/**
* @return true if this LeaseCoordinator is running
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java
index ae8040a5..b10ee1a3 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseRenewer.java
@@ -369,6 +369,15 @@ public class LeaseRenewer implements ILeaseRenewer {
ownedLeases.clear();
}
+ /**
+ * {@inheritDoc}
+ * @param lease the lease to drop.
+ */
+ @Override
+ public void dropLease(T lease) {
+ ownedLeases.remove(lease.getLeaseKey());
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java
index 6c2a8527..87e9182a 100644
--- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseRenewer.java
@@ -73,6 +73,13 @@ public interface ILeaseRenewer {
*/
public void clearCurrentlyHeldLeases();
+ /**
+ * Stops the lease renewer from continunig to maintain the given lease.
+ *
+ * @param lease the lease to drop.
+ */
+ void dropLease(T lease);
+
/**
* Update application-specific fields in a currently held lease. Cannot be used to update internal fields such as
* leaseCounter, leaseOwner, etc. Fails if we do not hold the lease, or if the concurrency token does not match
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
index d4ef9b20..2cda0632 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
index 4b15a532..bce1793c 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangProtocol.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
index a33e8ebf..babf6ac2 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java
index 524b481e..82ed5458 100644
--- a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java
@@ -14,7 +14,7 @@
*/
package com.amazonaws.services.kinesis.multilang.messages;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
/**
* A message to indicate to the client's process that it should shutdown and then terminate.
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java
new file mode 100644
index 00000000..c0a778e9
--- /dev/null
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java
@@ -0,0 +1,385 @@
+package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
+
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.hamcrest.Condition;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
+import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
+import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerStatesTest {
+
+ @Mock
+ private ShardConsumer consumer;
+ @Mock
+ private StreamConfig streamConfig;
+ @Mock
+ private IRecordProcessor recordProcessor;
+ @Mock
+ private RecordProcessorCheckpointer recordProcessorCheckpointer;
+ @Mock
+ private ExecutorService executorService;
+ @Mock
+ private ShardInfo shardInfo;
+ @Mock
+ private KinesisDataFetcher dataFetcher;
+ @Mock
+ private ILeaseManager leaseManager;
+ @Mock
+ private ICheckpoint checkpoint;
+ @Mock
+ private Future future;
+ @Mock
+ private ShutdownNotification shutdownNotification;
+ @Mock
+ private IKinesisProxy kinesisProxy;
+ @Mock
+ private InitialPositionInStreamExtended initialPositionInStream;
+
+ private long parentShardPollIntervalMillis = 0xCAFE;
+ private boolean cleanupLeasesOfCompletedShards = true;
+ private long taskBackoffTimeMillis = 0xBEEF;
+ private ShutdownReason reason = ShutdownReason.TERMINATE;
+
+ @Before
+ public void setup() {
+ when(consumer.getStreamConfig()).thenReturn(streamConfig);
+ when(consumer.getRecordProcessor()).thenReturn(recordProcessor);
+ when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer);
+ when(consumer.getExecutorService()).thenReturn(executorService);
+ when(consumer.getShardInfo()).thenReturn(shardInfo);
+ when(consumer.getDataFetcher()).thenReturn(dataFetcher);
+ when(consumer.getLeaseManager()).thenReturn(leaseManager);
+ when(consumer.getCheckpoint()).thenReturn(checkpoint);
+ when(consumer.getFuture()).thenReturn(future);
+ when(consumer.getShutdownNotification()).thenReturn(shutdownNotification);
+ when(consumer.getParentShardPollIntervalMillis()).thenReturn(parentShardPollIntervalMillis);
+ when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
+ when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
+ when(consumer.getShutdownReason()).thenReturn(reason);
+
+ }
+
+ private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class>) ILeaseManager.class;
+
+ @Test
+ public void blockOnParentStateTest() {
+ ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
+
+ ITask task = state.createTask(consumer);
+
+ assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo)));
+ assertThat(task,
+ taskWith(BlockOnParentShardTask.class, LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
+ assertThat(task, taskWith(BlockOnParentShardTask.class, Long.class, "parentShardPollIntervalMillis",
+ equalTo(parentShardPollIntervalMillis)));
+
+ assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState()));
+ for (ShutdownReason shutdownReason : ShutdownReason.values()) {
+ assertThat(state.shutdownTransition(shutdownReason),
+ equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
+ }
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));
+ assertThat(state.getTaskType(), equalTo(TaskType.BLOCK_ON_PARENT_SHARDS));
+
+ }
+
+ @Test
+ public void initializingStateTest() {
+ ConsumerState state = ShardConsumerState.INITIALIZING.getConsumerState();
+ ITask task = state.createTask(consumer);
+
+ assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
+ assertThat(task, initTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
+ assertThat(task, initTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
+ assertThat(task, initTask(ICheckpoint.class, "checkpoint", equalTo(checkpoint)));
+ assertThat(task, initTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
+ equalTo(recordProcessorCheckpointer)));
+ assertThat(task, initTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
+ assertThat(task, initTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
+
+ assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
+
+ assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
+ equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.INITIALIZING));
+ assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE));
+ }
+
+ @Test
+ public void processingStateTest() {
+ ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
+ ITask task = state.createTask(consumer);
+
+ assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
+ assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
+ assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
+ equalTo(recordProcessorCheckpointer)));
+ assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
+ assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
+ assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
+
+ assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
+
+ assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
+ equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
+ assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
+
+ }
+
+ @Test
+ public void shutdownRequestState() {
+ ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
+
+ ITask task = state.createTask(consumer);
+
+ assertThat(task, shutdownReqTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
+ assertThat(task, shutdownReqTask(IRecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
+ equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer)));
+ assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification)));
+
+ assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
+ assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
+ equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
+ assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
+ assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
+
+ }
+
+ @Test
+ public void shutdownRequestCompleteStateTest() {
+ ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
+
+ assertThat(state.createTask(consumer), nullValue());
+
+ assertThat(state.successTransition(), equalTo(state));
+
+ assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), equalTo(state));
+ assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+ assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
+ equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_REQUESTED));
+ assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_NOTIFICATION));
+
+ }
+
+ @Test
+ public void shuttingDownStateTest() {
+ ConsumerState state = ShardConsumerState.SHUTTING_DOWN.getConsumerState();
+
+ when(streamConfig.getStreamProxy()).thenReturn(kinesisProxy);
+ when(streamConfig.getInitialPositionInStream()).thenReturn(initialPositionInStream);
+
+ ITask task = state.createTask(consumer);
+
+ assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
+ assertThat(task, shutdownTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
+ assertThat(task, shutdownTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
+ equalTo(recordProcessorCheckpointer)));
+ assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason)));
+ assertThat(task, shutdownTask(IKinesisProxy.class, "kinesisProxy", equalTo(kinesisProxy)));
+ assertThat(task, shutdownTask(LEASE_MANAGER_CLASS, "leaseManager", equalTo(leaseManager)));
+ assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream",
+ equalTo(initialPositionInStream)));
+ assertThat(task,
+ shutdownTask(Boolean.class, "cleanupLeasesOfCompletedShards", equalTo(cleanupLeasesOfCompletedShards)));
+ assertThat(task, shutdownTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
+
+ assertThat(state.successTransition(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
+
+ for (ShutdownReason reason : ShutdownReason.values()) {
+ assertThat(state.shutdownTransition(reason),
+ equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
+ }
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.SHUTTING_DOWN));
+ assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN));
+
+ }
+
+ @Test
+ public void shutdownCompleteStateTest() {
+ ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
+
+ assertThat(state.createTask(consumer), nullValue());
+ verify(consumer, times(2)).getShutdownNotification();
+ verify(shutdownNotification).shutdownComplete();
+
+ assertThat(state.successTransition(), equalTo(state));
+ for(ShutdownReason reason : ShutdownReason.values()) {
+ assertThat(state.shutdownTransition(reason), equalTo(state));
+ }
+
+ assertThat(state.getState(), equalTo(ShardConsumerState.SHUTDOWN_COMPLETE));
+ assertThat(state.getTaskType(), equalTo(TaskType.SHUTDOWN_COMPLETE));
+ }
+
+ @Test
+ public void shutdownCompleteStateNullNotificationTest() {
+ ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
+
+ when(consumer.getShutdownNotification()).thenReturn(null);
+ assertThat(state.createTask(consumer), nullValue());
+
+ verify(consumer).getShutdownNotification();
+ verify(shutdownNotification, never()).shutdownComplete();
+ }
+
+ static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass,
+ String propertyName, Matcher matcher) {
+ return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher);
+ }
+
+ static ReflectionPropertyMatcher shutdownReqTask(
+ Class valueTypeClass, String propertyName, Matcher matcher) {
+ return taskWith(ShutdownNotificationTask.class, valueTypeClass, propertyName, matcher);
+ }
+
+ static ReflectionPropertyMatcher procTask(Class valueTypeClass,
+ String propertyName, Matcher matcher) {
+ return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher);
+ }
+
+ static ReflectionPropertyMatcher initTask(Class valueTypeClass,
+ String propertyName, Matcher matcher) {
+ return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher);
+ }
+
+ static ReflectionPropertyMatcher taskWith(Class taskTypeClass,
+ Class valueTypeClass, String propertyName, Matcher matcher) {
+ return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName);
+ }
+
+ private static class ReflectionPropertyMatcher extends TypeSafeDiagnosingMatcher {
+
+ private final Class taskTypeClass;
+ private final Class valueTypeClazz;
+ private final Matcher matcher;
+ private final String propertyName;
+ private final Field matchingField;
+
+ private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass,
+ Matcher matcher, String propertyName) {
+ this.taskTypeClass = taskTypeClass;
+ this.valueTypeClazz = valueTypeClass;
+ this.matcher = matcher;
+ this.propertyName = propertyName;
+
+ Field[] fields = taskTypeClass.getDeclaredFields();
+ Field matching = null;
+ for (Field field : fields) {
+ if (propertyName.equals(field.getName())) {
+ matching = field;
+ }
+ }
+ this.matchingField = matching;
+
+ }
+
+ @Override
+ protected boolean matchesSafely(ITask item, Description mismatchDescription) {
+
+ return Condition.matched(item, mismatchDescription).and(new Condition.Step() {
+ @Override
+ public Condition apply(ITask value, Description mismatch) {
+ if (taskTypeClass.equals(value.getClass())) {
+ return Condition.matched(taskTypeClass.cast(value), mismatch);
+ }
+ mismatch.appendText("Expected task type of ").appendText(taskTypeClass.getName())
+ .appendText(" but was ").appendText(value.getClass().getName());
+ return Condition.notMatched();
+ }
+ }).and(new Condition.Step() {
+ @Override
+ public Condition