diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index 7fb16073..47990226 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -45,24 +45,24 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie private static final int TIME_TO_KEEP_ALIVE = 5; private static final int CORE_THREAD_POOL_COUNT = 1; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final ExecutorService executorService; private final int retryGetRecordsInSeconds; private final String shardId; final Supplier> completionServiceSupplier; - public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(@NonNull final IDataFetcher dataFetcher, final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { this(dataFetcher, buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId); } - public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(final IDataFetcher dataFetcher, final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService), shardId); } - AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, + AsynchronousGetRecordsRetrievalStrategy(IDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, Supplier> completionServiceSupplier, String shardId) { this.dataFetcher = dataFetcher; @@ -148,7 +148,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java index 4f4c49b8..c9f9e613 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java @@ -30,7 +30,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; * If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly * proceed with processing data from the shard. */ -class BlockOnParentShardTask implements ITask { +public class BlockOnParentShardTask implements ITask { private static final Log LOG = LogFactory.getLog(BlockOnParentShardTask.class); private final ShardInfo shardInfo; @@ -45,9 +45,9 @@ class BlockOnParentShardTask implements ITask { * @param leaseManager Used to fetch the lease and checkpoint info for parent shards * @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing */ - BlockOnParentShardTask(ShardInfo shardInfo, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis) { + public BlockOnParentShardTask(ShardInfo shardInfo, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis) { this.shardInfo = shardInfo; this.leaseManager = leaseManager; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java deleted file mode 100644 index 5cf55dbf..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ /dev/null @@ -1,635 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -/** - * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, - * and state transitions is contained within the {@link ConsumerState} objects. - * - *

State Diagram

- * - *
- *       +-------------------+
- *       | Waiting on Parent |                               +------------------+
- *  +----+       Shard       |                               |     Shutdown     |
- *  |    |                   |          +--------------------+   Notification   |
- *  |    +----------+--------+          |  Shutdown:         |     Requested    |
- *  |               | Success           |   Requested        +-+-------+--------+
- *  |               |                   |                      |       |
- *  |        +------+-------------+     |                      |       | Shutdown:
- *  |        |    Initializing    +-----+                      |       |  Requested
- *  |        |                    |     |                      |       |
- *  |        |                    +-----+-------+              |       |
- *  |        +---------+----------+     |       | Shutdown:    | +-----+-------------+
- *  |                  | Success        |       |  Terminated  | |     Shutdown      |
- *  |                  |                |       |  Zombie      | |   Notification    +-------------+
- *  |           +------+-------------+  |       |              | |     Complete      |             |
- *  |           |     Processing     +--+       |              | ++-----------+------+             |
- *  |       +---+                    |          |              |  |           |                    |
- *  |       |   |                    +----------+              |  |           | Shutdown:          |
- *  |       |   +------+-------------+          |              \  /           |  Requested         |
- *  |       |          |                        |               \/            +--------------------+
- *  |       |          |                        |               ||
- *  |       | Success  |                        |               || Shutdown:
- *  |       +----------+                        |               ||  Terminated
- *  |                                           |               ||  Zombie
- *  |                                           |               ||
- *  |                                           |               ||
- *  |                                           |           +---++--------------+
- *  |                                           |           |   Shutting Down   |
- *  |                                           +-----------+                   |
- *  |                                                       |                   |
- *  |                                                       +--------+----------+
- *  |                                                                |
- *  |                                                                | Shutdown:
- *  |                                                                |  All Reasons
- *  |                                                                |
- *  |                                                                |
- *  |      Shutdown:                                        +--------+----------+
- *  |        All Reasons                                    |     Shutdown      |
- *  +-------------------------------------------------------+     Complete      |
- *                                                          |                   |
- *                                                          +-------------------+
- * 
- */ -class ConsumerStates { - - /** - * Enumerates processing states when working on a shard. - */ - enum ShardConsumerState { - // @formatter:off - WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), - INITIALIZING(new InitializingState()), - PROCESSING(new ProcessingState()), - SHUTDOWN_REQUESTED(new ShutdownNotificationState()), - SHUTTING_DOWN(new ShuttingDownState()), - SHUTDOWN_COMPLETE(new ShutdownCompleteState()); - //@formatter:on - - private final ConsumerState consumerState; - - ShardConsumerState(ConsumerState consumerState) { - this.consumerState = consumerState; - } - - public ConsumerState getConsumerState() { - return consumerState; - } - } - - - /** - * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to - * do when a transition occurs. - * - */ - interface ConsumerState { - /** - * Creates a new task for this state using the passed in consumer to build the task. If there is no task - * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the - * consumer during the execution of this method. - * - * @param consumer - * the consumer to use build the task, or execute state. - * @return a valid task for this state or null if there is no task required. - */ - ITask createTask(ShardConsumer consumer); - - /** - * Provides the next state of the consumer upon success of the task return by - * {@link ConsumerState#createTask(ShardConsumer)}. - * - * @return the next state that the consumer should transition to, this may be the same object as the current - * state. - */ - ConsumerState successTransition(); - - /** - * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent - * on the current state, and the shutdown reason. - * - * @param shutdownReason - * the reason that a shutdown was requested - * @return the next state that the consumer should transition to, this may be the same object as the current - * state. - */ - ConsumerState shutdownTransition(ShutdownReason shutdownReason); - - /** - * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state - * even if createTask would return a null value. - * - * @return the type of task that this state represents. - */ - TaskType getTaskType(); - - /** - * An enumeration represent the type of this state. Different consumer states may return the same - * {@link ShardConsumerState}. - * - * @return the type of consumer state this represents. - */ - ShardConsumerState getState(); - - boolean isTerminal(); - - } - - /** - * The initial state that any {@link ShardConsumer} should start in. - */ - static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); - - private static ConsumerState shutdownStateFor(ShutdownReason reason) { - switch (reason) { - case REQUESTED: - return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); - case TERMINATE: - case ZOMBIE: - return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); - default: - throw new IllegalArgumentException("Unknown reason: " + reason); - } - } - - /** - * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent - * shards have been completed. - * - *

Valid Transitions

- *
- *
Success
- *
Transition to the initializing state to allow the record processor to be initialized in preparation of - * processing.
- *
Shutdown
- *
- *
- *
All Reasons
- *
Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be - * informed of the shutdown.
- *
- *
- *
- */ - static class BlockedOnParentState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), - consumer.getParentShardPollIntervalMillis()); - } - - @Override - public ConsumerState successTransition() { - return ShardConsumerState.INITIALIZING.getConsumerState(); - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); - } - - @Override - public TaskType getTaskType() { - return TaskType.BLOCK_ON_PARENT_SHARDS; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.WAITING_ON_PARENT_SHARDS; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - /** - * This state is responsible for initializing the record processor with the shard information. - *

Valid Transitions

- *
- *
Success
- *
Transitions to the processing state which will begin to send records to the record processor
- *
Shutdown
- *
At this point the record processor has been initialized, but hasn't processed any records. This requires that - * the record processor be notified of the shutdown, even though there is almost no actions the record processor - * could take. - *
- *
{@link ShutdownReason#REQUESTED}
- *
Transitions to the {@link ShutdownNotificationState}
- *
{@link ShutdownReason#ZOMBIE}
- *
Transitions to the {@link ShuttingDownState}
- *
{@link ShutdownReason#TERMINATE}
- *
- *

- * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never - * makes an requests to Kinesis for records, so it can't reach the end of a shard. - *

- *

- * Transitions to the {@link ShuttingDownState} - *

- *
- *
- *
- *
- */ - static class InitializingState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return new InitializeTask(consumer.getShardInfo(), - consumer.getRecordProcessor(), - consumer.getCheckpoint(), - consumer.getRecordProcessorCheckpointer(), - consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), - consumer.getStreamConfig(), - consumer.getGetRecordsCache()); - } - - @Override - public ConsumerState successTransition() { - return ShardConsumerState.PROCESSING.getConsumerState(); - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return shutdownReason.getShutdownState(); - } - - @Override - public TaskType getTaskType() { - return TaskType.INITIALIZE; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.INITIALIZING; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - /** - * This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor. - * While in this state the only way a transition will occur is if a shutdown has been triggered. - *

Valid Transitions

- *
- *
Success
- *
Doesn't actually transition, but instead returns the same state
- *
Shutdown
- *
At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end - * of the shard triggering a {@link ShutdownReason#TERMINATE}. - *
- *
{@link ShutdownReason#REQUESTED}
- *
Transitions to the {@link ShutdownNotificationState}
- *
{@link ShutdownReason#ZOMBIE}
- *
Transitions to the {@link ShuttingDownState}
- *
{@link ShutdownReason#TERMINATE}
- *
Transitions to the {@link ShuttingDownState}
- *
- *
- *
- */ - static class ProcessingState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return new ProcessTask(consumer.getShardInfo(), - consumer.getStreamConfig(), - consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), - consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), - consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getGetRecordsCache()); - } - - @Override - public ConsumerState successTransition() { - return ShardConsumerState.PROCESSING.getConsumerState(); - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return shutdownReason.getShutdownState(); - } - - @Override - public TaskType getTaskType() { - return TaskType.PROCESS; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.PROCESSING; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState(); - - /** - * This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a - * chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a - * shutdown on the {@link InitializingState} or {@link ProcessingState}. - * - *

Valid Transitions

- *
- *
Success
- *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
- *
Shutdown
- *
At this point records are being retrieved, and processed. An explicit shutdown will allow the record - * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state. - *
- *
{@link ShutdownReason#REQUESTED}
- *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to - * {@link ShutdownNotificationCompletionState}
- *
{@link ShutdownReason#ZOMBIE}
- *
Transitions to the {@link ShuttingDownState}
- *
{@link ShutdownReason#TERMINATE}
- *
Transitions to the {@link ShuttingDownState}
- *
- *
- *
- */ - static class ShutdownNotificationState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return new ShutdownNotificationTask(consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), - consumer.getShutdownNotification(), - consumer.getShardInfo()); - } - - @Override - public ConsumerState successTransition() { - return SHUTDOWN_REQUEST_COMPLETION_STATE; - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - if (shutdownReason == ShutdownReason.REQUESTED) { - return SHUTDOWN_REQUEST_COMPLETION_STATE; - } - return shutdownReason.getShutdownState(); - } - - @Override - public TaskType getTaskType() { - return TaskType.SHUTDOWN_NOTIFICATION; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.SHUTDOWN_REQUESTED; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - /** - * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the - * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state. - * - *

Valid Transitions

- *
- *
Success
- *
- *

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. - *

- *

- * Remains in the {@link ShutdownNotificationCompletionState} - *

- *
- *
Shutdown
- *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is - * waiting that notification. While waiting for the notification no further processing should occur on the - * {@link ShardConsumer}. - *
- *
{@link ShutdownReason#REQUESTED}
- *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains - * {@link ShutdownNotificationCompletionState}
- *
{@link ShutdownReason#ZOMBIE}
- *
Transitions to the {@link ShuttingDownState}
- *
{@link ShutdownReason#TERMINATE}
- *
Transitions to the {@link ShuttingDownState}
- *
- *
- *
- */ - static class ShutdownNotificationCompletionState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return null; - } - - @Override - public ConsumerState successTransition() { - return this; - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - if (shutdownReason != ShutdownReason.REQUESTED) { - return shutdownReason.getShutdownState(); - } - return this; - } - - @Override - public TaskType getTaskType() { - return TaskType.SHUTDOWN_NOTIFICATION; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.SHUTDOWN_REQUESTED; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - /** - * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard. - * - *

Valid Transitions

- *
- *
Success
- *
- *

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. - *

- *

- * Transitions to the {@link ShutdownCompleteState} - *

- *
- *
Shutdown
- *
At this point the record processor has processed the final shutdown indication, and depending on the shutdown - * reason taken the correct course of action. From this point on there should be no more interactions with the - * record processor or {@link ShardConsumer}. - *
- *
{@link ShutdownReason#REQUESTED}
- *
- *

- * This should not occur as all other {@link ShutdownReason}s take priority over it. - *

- *

- * Transitions to {@link ShutdownCompleteState} - *

- *
- *
{@link ShutdownReason#ZOMBIE}
- *
Transitions to the {@link ShutdownCompleteState}
- *
{@link ShutdownReason#TERMINATE}
- *
Transitions to the {@link ShutdownCompleteState}
- *
- *
- *
- */ - static class ShuttingDownState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), - consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), - consumer.getShutdownReason(), - consumer.getStreamConfig().getStreamProxy(), - consumer.getStreamConfig().getInitialPositionInStream(), - consumer.isCleanupLeasesOfCompletedShards(), - consumer.isIgnoreUnexpectedChildShards(), - consumer.getLeaseCoordinator(), - consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsCache(), consumer.getShardSyncer(), - consumer.getShardSyncStrategy(), consumer.getChildShards(), - consumer.getLeaseCleanupManager()); - } - - @Override - public ConsumerState successTransition() { - return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); - } - - @Override - public TaskType getTaskType() { - return TaskType.SHUTDOWN; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.SHUTTING_DOWN; - } - - @Override - public boolean isTerminal() { - return false; - } - } - - /** - * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed. - * - *

Valid Transitions

- *
- *
Success
- *
- *

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. - *

- *

- * Remains in the {@link ShutdownCompleteState} - *

- *
- *
Shutdown
- *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any - * further actions. - *
- *
{@link ShutdownReason#REQUESTED}
- *
- *

- * This should not occur as all other {@link ShutdownReason}s take priority over it. - *

- *

- * Remains in {@link ShutdownCompleteState} - *

- *
- *
{@link ShutdownReason#ZOMBIE}
- *
Remains in {@link ShutdownCompleteState}
- *
{@link ShutdownReason#TERMINATE}
- *
Remains in {@link ShutdownCompleteState}
- *
- *
- *
- */ - static class ShutdownCompleteState implements ConsumerState { - - @Override - public ITask createTask(ShardConsumer consumer) { - if (consumer.getShutdownNotification() != null) { - consumer.getShutdownNotification().shutdownComplete(); - } - return null; - } - - @Override - public ConsumerState successTransition() { - return this; - } - - @Override - public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return this; - } - - @Override - public TaskType getTaskType() { - return TaskType.SHUTDOWN_COMPLETE; - } - - @Override - public ShardConsumerState getState() { - return ShardConsumerState.SHUTDOWN_COMPLETE; - } - - @Override - public boolean isTerminal() { - return true; - } - } - -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java index 71a15340..d831a4f1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java @@ -46,9 +46,9 @@ public interface GetRecordsRetrievalStrategy { boolean isShutdown(); /** - * Returns the KinesisDataFetcher used to getRecords from Kinesis. + * Returns the IDataFetcher used to getRecords * - * @return KinesisDataFetcher + * @return IDataFetcher */ - KinesisDataFetcher getDataFetcher(); + IDataFetcher getDataFetcher(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 5343470f..7ee4ab27 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -29,7 +29,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; /** * Task for initializing shard position and invoking the RecordProcessor initialize() API. */ -class InitializeTask implements ITask { +public class InitializeTask implements ITask { private static final Log LOG = LogFactory.getLog(InitializeTask.class); @@ -37,7 +37,7 @@ class InitializeTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.INITIALIZE; private final ICheckpoint checkpoint; private final RecordProcessorCheckpointer recordProcessorCheckpointer; @@ -49,14 +49,14 @@ class InitializeTask implements ITask { /** * Constructor. */ - InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig, - GetRecordsCache getRecordsCache) { + public InitializeTask(ShardInfo shardInfo, + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + IDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index ccde83f3..10441234 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.Set; import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; @@ -61,7 +62,7 @@ public class KinesisClientLibConfiguration { public static final int DEFAULT_MAX_RECORDS = 10000; /** - * The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + * The default value for how long the {@link KinesisShardConsumer} should sleep if no records are returned from the call to * {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}. */ public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L; @@ -627,7 +628,7 @@ public class KinesisClientLibConfiguration { * @param billingMode The DDB Billing mode to set for lease table creation. * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * {@link LeaseCleanupManager} * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases @@ -926,7 +927,7 @@ public class KinesisClientLibConfiguration { } /** - * @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @return Interval in millis at which to run lease cleanup thread in {@link LeaseCleanupManager} */ public long leaseCleanupIntervalMillis() { return leaseCleanupIntervalMillis; @@ -1030,7 +1031,7 @@ public class KinesisClientLibConfiguration { * Keeping it protected to forbid outside callers from depending on this internal object. * @return The initialPositionInStreamExtended object. */ - protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() { + public InitialPositionInStreamExtended getInitialPositionInStreamExtended() { return initialPositionInStreamExtended; } @@ -1623,7 +1624,7 @@ public class KinesisClientLibConfiguration { /** * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * {@link LeaseCleanupManager} * @return */ public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 47baad04..ad55ab52 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -51,7 +51,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; /** * This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints. */ -class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { +public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class); @@ -368,7 +368,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator getLeaseManager() { + public ILeaseManager getLeaseManager() { return leaseManager; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index ae4e321d..4c8d638b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -39,7 +39,7 @@ import lombok.Data; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ -class KinesisDataFetcher { +public class KinesisDataFetcher implements IDataFetcher{ private static final Log LOG = LogFactory.getLog(KinesisDataFetcher.class); @@ -185,7 +185,7 @@ class KinesisDataFetcher { * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ - void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { + public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) { @@ -276,11 +276,11 @@ class KinesisDataFetcher { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return isShardEndReached; } - protected List getChildShards() { + public List getChildShards() { return childShards; } @@ -290,5 +290,4 @@ class KinesisDataFetcher { String getNextIterator() { return nextIterator; } - } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java deleted file mode 100644 index cdf73e82..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import com.amazonaws.services.cloudwatch.model.StandardUnit; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; -import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.impl.UpdateField; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; -import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; -import com.amazonaws.services.kinesis.model.Shard; -import com.amazonaws.util.CollectionUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ComparisonChain; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NonNull; -import lombok.Value; -import lombok.experimental.Accessors; -import org.apache.commons.lang3.Validate; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; - -/** - * The top level orchestrator for coordinating the periodic shard sync related activities. If the configured - * {@link ShardSyncStrategyType} is PERIODIC, this class will be the main shard sync orchestrator. For non-PERIODIC - * strategies, this class will serve as an internal auditor that periodically checks if the full hash range is covered - * by currently held leases, and initiates a recovery shard sync if not. - */ -@Getter -@EqualsAndHashCode -class PeriodicShardSyncManager { - private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class); - private static final long INITIAL_DELAY = 0; - - /** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */ - private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L; - - /** Parameters for validating hash range completeness when running in auditor mode. */ - @VisibleForTesting - static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; - @VisibleForTesting - static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); - static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager"; - private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker(); - - private final String workerId; - private final LeaderDecider leaderDecider; - private final ITask metricsEmittingShardSyncTask; - private final ScheduledExecutorService shardSyncThreadPool; - private final ILeaseManager leaseManager; - private final IKinesisProxy kinesisProxy; - private final boolean isAuditorMode; - private final long periodicShardSyncIntervalMillis; - private boolean isRunning; - private final IMetricsFactory metricsFactory; - private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; - - - PeriodicShardSyncManager(String workerId, - LeaderDecider leaderDecider, - ShardSyncTask shardSyncTask, - IMetricsFactory metricsFactory, - ILeaseManager leaseManager, - IKinesisProxy kinesisProxy, - boolean isAuditorMode, - long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { - this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, - leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, - leasesRecoveryAuditorInconsistencyConfidenceThreshold); - } - - PeriodicShardSyncManager(String workerId, - LeaderDecider leaderDecider, - ShardSyncTask shardSyncTask, - ScheduledExecutorService shardSyncThreadPool, - IMetricsFactory metricsFactory, - ILeaseManager leaseManager, - IKinesisProxy kinesisProxy, - boolean isAuditorMode, - long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { - Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); - Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); - Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); - this.workerId = workerId; - this.leaderDecider = leaderDecider; - this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); - this.shardSyncThreadPool = shardSyncThreadPool; - this.leaseManager = leaseManager; - this.kinesisProxy = kinesisProxy; - this.metricsFactory = metricsFactory; - this.isAuditorMode = isAuditorMode; - this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; - if (isAuditorMode) { - Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies."); - Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies."); - this.periodicShardSyncIntervalMillis = leasesRecoveryAuditorExecutionFrequencyMillis; - } else { - this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; - } - } - - public synchronized TaskResult start() { - if (!isRunning) { - final Runnable periodicShardSyncer = () -> { - try { - runShardSync(); - } catch (Throwable t) { - LOG.error("Error running shard sync.", t); - } - }; - - shardSyncThreadPool - .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, periodicShardSyncIntervalMillis, - TimeUnit.MILLISECONDS); - isRunning = true; - } - return new TaskResult(null); - } - - /** - * Runs ShardSync once, without scheduling further periodic ShardSyncs. - * @return TaskResult from shard sync - */ - public synchronized TaskResult syncShardsOnce() { - LOG.info("Syncing shards once from worker " + workerId); - return metricsEmittingShardSyncTask.call(); - } - - public void stop() { - if (isRunning) { - LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); - leaderDecider.shutdown(); - LOG.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId)); - shardSyncThreadPool.shutdown(); - isRunning = false; - } - } - - private void runShardSync() { - if (leaderDecider.isLeader(workerId)) { - LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task"); - - MetricsHelper.startScope(metricsFactory, PERIODIC_SHARD_SYNC_MANAGER); - boolean isRunSuccess = false; - final long runStartMillis = System.currentTimeMillis(); - - try { - final ShardSyncResponse shardSyncResponse = checkForShardSync(); - MetricsHelper.getMetricsScope().addData("NumStreamsToSync", shardSyncResponse.shouldDoShardSync() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY); - MetricsHelper.getMetricsScope().addData("NumStreamsWithPartialLeases", shardSyncResponse.isHoleDetected() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY); - if (shardSyncResponse.shouldDoShardSync()) { - LOG.info("Periodic shard syncer initiating shard sync due to the reason - " + - shardSyncResponse.reasonForDecision()); - metricsEmittingShardSyncTask.call(); - } else { - LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision()); - } - isRunSuccess = true; - } catch (Exception e) { - LOG.error("Caught exception while running periodic shard syncer.", e); - } finally { - MetricsHelper.addSuccessAndLatency(runStartMillis, isRunSuccess, MetricsLevel.SUMMARY); - MetricsHelper.endScope(); - } - } else { - LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task"); - } - } - - @VisibleForTesting - ShardSyncResponse checkForShardSync() throws DependencyException, InvalidStateException, - ProvisionedThroughputException { - - if (!isAuditorMode) { - // If we are running with PERIODIC shard sync strategy, we should sync every time. - return new ShardSyncResponse(true, false, "Syncing every time with PERIODIC shard sync strategy."); - } - - // Get current leases from DynamoDB. - final List currentLeases = leaseManager.listLeases(); - - if (CollectionUtils.isNullOrEmpty(currentLeases)) { - // If the current leases are null or empty, then we need to initiate a shard sync. - LOG.info("No leases found. Will trigger a shard sync."); - return new ShardSyncResponse(true, false, "No leases found."); - } - - // Check if there are any holes in the hash range covered by current leases. Return the first hole if present. - Optional hashRangeHoleOpt = hasHoleInLeases(currentLeases); - if (hashRangeHoleOpt.isPresent()) { - // If hole is present, check if the hole is detected consecutively in previous occurrences. If hole is - // determined with high confidence, return true; return false otherwise. We use the high confidence factor - // to avoid shard sync on any holes during resharding and lease cleanups, or other intermittent issues. - final boolean hasHoleWithHighConfidence = - hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); - - return new ShardSyncResponse(hasHoleWithHighConfidence, true, - "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + - "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); - } else { - // If hole is not present, clear any previous hole tracking and return false. - hashRangeHoleTracker.reset(); - return new ShardSyncResponse(false, false, "Hash range is complete."); - } - } - - @VisibleForTesting - Optional hasHoleInLeases(List leases) { - // Filter out any leases with checkpoints other than SHARD_END - final List activeLeases = leases.stream() - .filter(lease -> lease.getCheckpoint() != null && !lease.getCheckpoint().isShardEnd()) - .collect(Collectors.toList()); - - final List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(activeLeases); - return checkForHoleInHashKeyRanges(activeLeasesWithHashRanges); - } - - private List fillWithHashRangesIfRequired(List activeLeases) { - final List activeLeasesWithNoHashRanges = activeLeases.stream() - .filter(lease -> lease.getHashKeyRange() == null).collect(Collectors.toList()); - - if (activeLeasesWithNoHashRanges.isEmpty()) { - return activeLeases; - } - - // Fetch shards from Kinesis to fill in the in-memory hash ranges - final Map kinesisShards = kinesisProxy.getShardList().stream() - .collect(Collectors.toMap(Shard::getShardId, shard -> shard)); - - return activeLeases.stream().map(lease -> { - if (lease.getHashKeyRange() == null) { - final String shardId = lease.getLeaseKey(); - final Shard shard = kinesisShards.get(shardId); - if (shard == null) { - return lease; - } - lease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); - - try { - leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); - } catch (Exception e) { - LOG.warn("Unable to update hash range information for lease " + lease.getLeaseKey() + - ". This may result in explicit lease sync."); - } - } - return lease; - }).filter(lease -> lease.getHashKeyRange() != null).collect(Collectors.toList()); - } - - @VisibleForTesting - static Optional checkForHoleInHashKeyRanges(List leasesWithHashKeyRanges) { - // Sort the hash ranges by starting hash key - final List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); - if (sortedLeasesWithHashKeyRanges.isEmpty()) { - LOG.error("No leases with valid hash ranges found."); - return Optional.of(new HashRangeHole()); - } - - // Validate the hash range bounds - final KinesisClientLease minHashKeyLease = sortedLeasesWithHashKeyRanges.get(0); - final KinesisClientLease maxHashKeyLease = - sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1); - if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || - !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { - LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease); - return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange())); - } - - // Check for any holes in the sorted hash range intervals - if (sortedLeasesWithHashKeyRanges.size() > 1) { - KinesisClientLease leftmostLeaseToReportInCaseOfHole = minHashKeyLease; - HashKeyRangeForLease leftLeaseHashRange = leftmostLeaseToReportInCaseOfHole.getHashKeyRange(); - - for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) { - final KinesisClientLease rightLease = sortedLeasesWithHashKeyRanges.get(i); - final HashKeyRangeForLease rightLeaseHashRange = rightLease.getHashKeyRange(); - final BigInteger rangeDiff = - rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey()); - // We have overlapping leases when rangeDiff is 0 or negative. - // signum() will be -1 for negative and 0 if value is 0. - // Merge the ranges for further tracking. - if (rangeDiff.signum() <= 0) { - leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(), - leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey())); - } else { - // We have non-overlapping leases when rangeDiff is positive. signum() will be 1 in this case. - // If rangeDiff is 1, then it is a continuous hash range. If not, there is a hole. - if (!rangeDiff.equals(BigInteger.ONE)) { - LOG.error("Incomplete hash range found between " + leftmostLeaseToReportInCaseOfHole + - " and " + rightLease); - return Optional.of(new HashRangeHole(leftmostLeaseToReportInCaseOfHole.getHashKeyRange(), - rightLease.getHashKeyRange())); - } - - leftmostLeaseToReportInCaseOfHole = rightLease; - leftLeaseHashRange = rightLeaseHashRange; - } - } - } - - return Optional.empty(); - } - - @VisibleForTesting - static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { - if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) { - return leasesWithHashKeyRanges; - } - Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); - return leasesWithHashKeyRanges; - } - - @Value - @Accessors(fluent = true) - @VisibleForTesting - static class ShardSyncResponse { - private final boolean shouldDoShardSync; - private final boolean isHoleDetected; - private final String reasonForDecision; - } - - @Value - private static class HashRangeHole { - private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; - private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; - - HashRangeHole() { - hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null; - } - - HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, - HashKeyRangeForLease hashRangeAtEndOfPossibleHole) { - this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole; - this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole; - } - } - - private class HashRangeHoleTracker { - private HashRangeHole hashRangeHole; - @Getter - private Integer numConsecutiveHoles; - - public boolean hashHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { - if (hashRangeHole.equals(this.hashRangeHole)) { - ++this.numConsecutiveHoles; - } else { - this.hashRangeHole = hashRangeHole; - this.numConsecutiveHoles = 1; - } - - return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold; - } - - public void reset() { - this.hashRangeHole = null; - this.numConsecutiveHoles = 0; - } - } - - private static class HashKeyRangeComparator implements Comparator, Serializable { - private static final long serialVersionUID = 1L; - - @Override - public int compare(KinesisClientLease lease, KinesisClientLease otherLease) { - Validate.notNull(lease); - Validate.notNull(otherLease); - Validate.notNull(lease.getHashKeyRange()); - Validate.notNull(otherLease.getHashKeyRange()); - return ComparisonChain.start() - .compare(lease.getHashKeyRange().startingHashKey(), otherLease.getHashKeyRange().startingHashKey()) - .compare(lease.getHashKeyRange().endingHashKey(), otherLease.getHashKeyRange().endingHashKey()) - .result(); - } - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index a4cf74d8..3e4dbcb7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -60,7 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private PrefetchCounters prefetchCounters; private boolean started = false; private final String operation; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final String shardId; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index cd543e23..61587450 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -41,7 +41,7 @@ import com.amazonaws.services.kinesis.model.Shard; /** * Task for fetching data records and invoking processRecords() on the record processor instance. */ -class ProcessTask implements ITask { +public class ProcessTask implements ITask { private static final Log LOG = LogFactory.getLog(ProcessTask.class); @@ -55,7 +55,7 @@ class ProcessTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.PROCESS; private final StreamConfig streamConfig; private final long backoffTimeMillis; @@ -81,7 +81,7 @@ class ProcessTask implements ITask { * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, @@ -107,7 +107,7 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index dbee9218..16e2b317 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -37,7 +37,7 @@ import com.amazonaws.services.kinesis.model.Record; * The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. */ -class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { +public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { private static final Log LOG = LogFactory.getLog(RecordProcessorCheckpointer.class); @@ -62,10 +62,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * @param checkpoint Used to checkpoint progress of a RecordProcessor * @param validator Used for validating sequence numbers */ - RecordProcessorCheckpointer(ShardInfo shardInfo, - ICheckpoint checkpoint, - SequenceNumberValidator validator, - IMetricsFactory metricsFactory) { + public RecordProcessorCheckpointer(ShardInfo shardInfo, + ICheckpoint checkpoint, + SequenceNumberValidator validator, + IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; @@ -231,7 +231,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { /** * @return the lastCheckpointValue */ - ExtendedSequenceNumber getLastCheckpointValue() { + public ExtendedSequenceNumber getLastCheckpointValue() { return lastCheckpointValue; } @@ -244,14 +244,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * * @return the largest permitted checkpoint */ - synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { + public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { return largestPermittedCheckpointValue; } /** * @param largestPermittedCheckpointValue the largest permitted checkpoint */ - synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { + public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; } @@ -262,7 +262,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * * @param extendedSequenceNumber */ - synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { + public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { this.sequenceNumberAtShardEnd = extendedSequenceNumber; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index 3ca28235..ac81dc8b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -51,7 +51,7 @@ public class SequenceNumberValidator { * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers * being validated */ - SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { + public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { this.proxy = proxy; this.shardId = shardId; this.validateWithGetIterator = validateWithGetIterator; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java deleted file mode 100644 index 11ee39b5..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ /dev/null @@ -1,599 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; - -import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; -import com.amazonaws.services.kinesis.model.ChildShard; -import com.amazonaws.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; -import com.google.common.annotations.VisibleForTesting; - -import lombok.Getter; - -/** - * Responsible for consuming data records of a (specified) shard. - * The instance should be shutdown when we lose the primary responsibility for a shard. - * A new instance should be created if the primary responsibility is reassigned back to this process. - */ -class ShardConsumer { - - private static final Log LOG = LogFactory.getLog(ShardConsumer.class); - - private final StreamConfig streamConfig; - private final IRecordProcessor recordProcessor; - private final KinesisClientLibConfiguration config; - private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final ExecutorService executorService; - private final ShardInfo shardInfo; - private final KinesisDataFetcher dataFetcher; - private final IMetricsFactory metricsFactory; - private final KinesisClientLibLeaseCoordinator leaseCoordinator; - private ICheckpoint checkpoint; - private LeaseCleanupManager leaseCleanupManager; - // Backoff time when polling to check if application has finished processing parent shards - private final long parentShardPollIntervalMillis; - private final boolean cleanupLeasesOfCompletedShards; - private final long taskBackoffTimeMillis; - private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; - - @Getter - private final ShardSyncer shardSyncer; - - private ITask currentTask; - private long currentTaskSubmitTime; - private Future future; - private ShardSyncStrategy shardSyncStrategy; - - @Getter - private List childShards; - - @Getter - private final GetRecordsCache getRecordsCache; - - private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - ShardInfo shardInfo) { - Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> - maxGetRecordsThreadPool.map(max -> - new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); - - return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); - } - - /* - * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do - * much coordination/synchronization to handle concurrent reads/updates. - */ - private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE; - /* - * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. - * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. - */ - private volatile ShutdownReason shutdownReason; - private volatile ShutdownNotification shutdownNotification; - - /** - * @param shardInfo Shard information - * @param streamConfig Stream configuration to use - * @param checkpoint Checkpoint tracker - * @param recordProcessor Record processor used to process the data records for the shard - * @param config Kinesis library configuration - * @param leaseCoordinator Used to manage leases for current worker - * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) - * @param executorService ExecutorService used to execute process tasks for this shard - * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard - * @param backoffTimeMillis backoff interval when we encounter exceptions - * @param shardSyncer shardSyncer instance used to check and create new leases - */ - // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { - - this(shardInfo, - streamConfig, - checkpoint, - recordProcessor, - leaseCoordinator, - parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, - executorService, - metricsFactory, - backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional.empty(), - Optional.empty(), - config, shardSyncer, shardSyncStrategy); - } - - /** - * @param shardInfo Shard information - * @param streamConfig Stream configuration to use - * @param checkpoint Checkpoint tracker - * @param recordProcessor Record processor used to process the data records for the shard - * @param leaseCoordinator Used to manage leases for current worker - * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) - * @param executorService ExecutorService used to execute process tasks for this shard - * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard - * @param backoffTimeMillis backoff interval when we encounter exceptions - * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. - * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. - * @param config Kinesis library configuration - * @param shardSyncer shardSyncer instance used to check and create new leases - */ - // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { - this( - shardInfo, - streamConfig, - checkpoint, - recordProcessor, - new RecordProcessorCheckpointer( - shardInfo, - checkpoint, - new SequenceNumberValidator( - streamConfig.getStreamProxy(), - shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), - metricsFactory), - leaseCoordinator, - parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, - executorService, - metricsFactory, - backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), - retryGetRecordsInSeconds, - maxGetRecordsThreadPool, - config, shardSyncer, shardSyncStrategy - ); - } - - /** - * @param shardInfo Shard information - * @param streamConfig Stream Config to use - * @param checkpoint Checkpoint tracker - * @param recordProcessor Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress - * @param leaseCoordinator Used to manage leases for current worker - * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) - * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards - * @param executorService ExecutorService used to execute process tasks for this shard - * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard - * @param backoffTimeMillis backoff interval when we encounter exceptions - * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists - * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. - * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record - * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool - * @param config Kinesis library configuration - * @param shardSyncer shardSyncer instance used to check and create new leases - */ - @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { - - this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, - parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, - backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, - maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), - Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), - config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), - config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); - } - - /** - * @param shardInfo Shard information - * @param streamConfig Stream Config to use - * @param checkpoint Checkpoint tracker - * @param recordProcessor Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress - * @param leaseCoordinator Used to manage leases for current worker - * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) - * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards - * @param executorService ExecutorService used to execute process tasks for this shard - * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard - * @param backoffTimeMillis backoff interval when we encounter exceptions - * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists - * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. - * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record - * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool - * @param config Kinesis library configuration - * @param shardSyncer shardSyncer instance used to check and create new leases - * @param leaseCleanupManager used to clean up leases in lease table. - */ - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, - LeaseCleanupManager leaseCleanupManager) { - this.shardInfo = shardInfo; - this.streamConfig = streamConfig; - this.checkpoint = checkpoint; - this.recordProcessor = recordProcessor; - this.recordProcessorCheckpointer = recordProcessorCheckpointer; - this.leaseCoordinator = leaseCoordinator; - this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; - this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; - this.executorService = executorService; - this.metricsFactory = metricsFactory; - this.taskBackoffTimeMillis = backoffTimeMillis; - this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.config = config; - this.dataFetcher = kinesisDataFetcher; - this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( - makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), - this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); - this.shardSyncer = shardSyncer; - this.shardSyncStrategy = shardSyncStrategy; - this.leaseCleanupManager = leaseCleanupManager; - } - - /** - * No-op if current task is pending, otherwise submits next task for this shard. - * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. - * - * @return true if a new process task was submitted, false otherwise - */ - synchronized boolean consumeShard() { - return checkAndSubmitNextTask(); - } - - private boolean readyForNextTask() { - return future == null || future.isCancelled() || future.isDone(); - } - - private synchronized boolean checkAndSubmitNextTask() { - boolean submittedNewTask = false; - if (readyForNextTask()) { - TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE; - if (future != null && future.isDone()) { - taskOutcome = determineTaskOutcome(); - } - - updateState(taskOutcome); - ITask nextTask = getNextTask(); - if (nextTask != null) { - currentTask = nextTask; - try { - future = executorService.submit(currentTask); - currentTaskSubmitTime = System.currentTimeMillis(); - submittedNewTask = true; - LOG.debug("Submitted new " + currentTask.getTaskType() - + " task for shard " + shardInfo.getShardId()); - } catch (RejectedExecutionException e) { - LOG.info(currentTask.getTaskType() + " task was not accepted for execution.", e); - } catch (RuntimeException e) { - LOG.info(currentTask.getTaskType() + " task encountered exception ", e); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("No new task to submit for shard %s, currentState %s", - shardInfo.getShardId(), - currentState.toString())); - } - } - } else { - final long timeElapsed = System.currentTimeMillis() - currentTaskSubmitTime; - final String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ", - currentTask.getTaskType(), shardInfo.getShardId(), timeElapsed); - if (LOG.isDebugEnabled()) { - LOG.debug(commonMessage + "Not submitting new task."); - } - config.getLogWarningForTaskAfterMillis().ifPresent(value -> { - if (timeElapsed > value) { - LOG.warn(commonMessage); - } - }); - } - - return submittedNewTask; - } - - public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { - return skipShardSyncAtWorkerInitializationIfLeasesExist; - } - - private enum TaskOutcome { - SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND - } - - private TaskOutcome determineTaskOutcome() { - try { - TaskResult result = future.get(); - if (result.getException() == null) { - if (result.isShardEndReached()) { - if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) { - childShards = result.getChildShards(); - LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards); - } - return TaskOutcome.END_OF_SHARD; - } - return TaskOutcome.SUCCESSFUL; - } - logTaskException(result); - // This is the case of result with exception - if (result.isLeaseNotFound()) { - return TaskOutcome.LEASE_NOT_FOUND; - } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - // Setting future to null so we don't misinterpret task completion status in case of exceptions - future = null; - } - return TaskOutcome.FAILURE; - } - - private void logTaskException(TaskResult taskResult) { - if (LOG.isDebugEnabled()) { - Exception taskException = taskResult.getException(); - if (taskException instanceof BlockedOnParentShardException) { - // No need to log the stack trace for this exception (it is very specific). - LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard."); - } else { - LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", - taskResult.getException()); - } - } - } - - /** - * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint - * before being shutdown. - * - * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. - */ - void notifyShutdownRequested(ShutdownNotification shutdownNotification) { - this.shutdownNotification = shutdownNotification; - markForShutdown(ShutdownReason.REQUESTED); - } - - /** - * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). - * This is called by Worker when it loses responsibility for a shard. - * - * @return true if shutdown is complete (false if shutdown is still in progress) - */ - synchronized boolean beginShutdown() { - markForShutdown(ShutdownReason.ZOMBIE); - checkAndSubmitNextTask(); - - return isShutdown(); - } - - synchronized void markForShutdown(ShutdownReason reason) { - // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) - if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { - shutdownReason = reason; - } - } - - /** - * Used (by Worker) to check if this ShardConsumer instance has been shutdown - * RecordProcessor shutdown() has been invoked, as appropriate. - * - * @return true if shutdown is complete - */ - boolean isShutdown() { - return currentState.isTerminal(); - } - - /** - * @return the shutdownReason - */ - ShutdownReason getShutdownReason() { - return shutdownReason; - } - - /** - * Figure out next task to run based on current state, task, and shutdown context. - * - * @return Return next task to run - */ - private ITask getNextTask() { - ITask nextTask = currentState.createTask(this); - - if (nextTask == null) { - return null; - } else { - return new MetricsCollectingTaskDecorator(nextTask, metricsFactory); - } - } - - /** - * Note: This is a private/internal method with package level access solely for testing purposes. - * Update state based on information about: task success, current state, and shutdown info. - * - * @param taskOutcome The outcome of the last task - */ - void updateState(TaskOutcome taskOutcome) { - if (taskOutcome == TaskOutcome.END_OF_SHARD) { - markForShutdown(ShutdownReason.TERMINATE); - LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); - } - if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) { - markForShutdown(ShutdownReason.ZOMBIE); - LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found"); - } - if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { - currentState = currentState.shutdownTransition(shutdownReason); - } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { - currentState = currentState.shutdownTransition(shutdownReason); - } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { - if (currentState.getTaskType() == currentTask.getTaskType()) { - currentState = currentState.successTransition(); - } else { - LOG.error("Current State task type of '" + currentState.getTaskType() - + "' doesn't match the current tasks type of '" + currentTask.getTaskType() - + "'. This shouldn't happen, and indicates a programming error. " - + "Unable to safely transition to the next state."); - } - } - // - // Don't change state otherwise - // - - } - - @VisibleForTesting - boolean isShutdownRequested() { - return shutdownReason != null; - } - - /** - * Private/Internal method - has package level access solely for testing purposes. - * - * @return the currentState - */ - ConsumerStates.ShardConsumerState getCurrentState() { - return currentState.getState(); - } - - StreamConfig getStreamConfig() { - return streamConfig; - } - - IRecordProcessor getRecordProcessor() { - return recordProcessor; - } - - RecordProcessorCheckpointer getRecordProcessorCheckpointer() { - return recordProcessorCheckpointer; - } - - ExecutorService getExecutorService() { - return executorService; - } - - ShardInfo getShardInfo() { - return shardInfo; - } - - KinesisDataFetcher getDataFetcher() { - return dataFetcher; - } - - ILeaseManager getLeaseManager() { - return leaseCoordinator.getLeaseManager(); - } - - KinesisClientLibLeaseCoordinator getLeaseCoordinator() { - return leaseCoordinator; - } - - ICheckpoint getCheckpoint() { - return checkpoint; - } - - long getParentShardPollIntervalMillis() { - return parentShardPollIntervalMillis; - } - - boolean isCleanupLeasesOfCompletedShards() { - return cleanupLeasesOfCompletedShards; - } - - boolean isIgnoreUnexpectedChildShards() { - return config.shouldIgnoreUnexpectedChildShards(); - } - - long getTaskBackoffTimeMillis() { - return taskBackoffTimeMillis; - } - - Future getFuture() { - return future; - } - - ShutdownNotification getShutdownNotification() { - return shutdownNotification; - } - - ShardSyncStrategy getShardSyncStrategy() { - return shardSyncStrategy; - } - - LeaseCleanupManager getLeaseCleanupManager() { - return leaseCleanupManager; - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java index ecf5041f..98e4702e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -21,14 +21,14 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotif /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. */ -class ShutdownNotificationTask implements ITask { +public class ShutdownNotificationTask implements ITask { private final IRecordProcessor recordProcessor; private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShutdownNotification shutdownNotification; private final ShardInfo shardInfo; - ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { + public ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.shutdownNotification = shutdownNotification; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java index 5e29d6dd..c326e361 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java @@ -15,8 +15,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; /** @@ -72,7 +72,7 @@ public enum ShutdownReason { return reason.rank > this.rank; } - ConsumerState getShutdownState() { + public ConsumerState getShutdownState() { return shutdownState; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java deleted file mode 100644 index 07c9fff2..00000000 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - -import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; -import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; -import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; -import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; -import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; -import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; -import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; -import com.amazonaws.services.kinesis.leases.impl.UpdateField; -import com.amazonaws.services.kinesis.model.ChildShard; -import com.amazonaws.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.google.common.annotations.VisibleForTesting; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Random; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * Task for invoking the RecordProcessor shutdown() callback. - */ -class ShutdownTask implements ITask { - - private static final Log LOG = LogFactory.getLog(ShutdownTask.class); - - @VisibleForTesting - static final int RETRY_RANDOM_MAX_RANGE = 50; - - private final ShardInfo shardInfo; - private final IRecordProcessor recordProcessor; - private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final ShutdownReason reason; - private final IKinesisProxy kinesisProxy; - private final KinesisClientLibLeaseCoordinator leaseCoordinator; - private final InitialPositionInStreamExtended initialPositionInStream; - private final boolean cleanupLeasesOfCompletedShards; - private final boolean ignoreUnexpectedChildShards; - private final TaskType taskType = TaskType.SHUTDOWN; - private final long backoffTimeMillis; - private final GetRecordsCache getRecordsCache; - private final ShardSyncer shardSyncer; - private final ShardSyncStrategy shardSyncStrategy; - private final List childShards; - private final LeaseCleanupManager leaseCleanupManager; - - /** - * Constructor. - */ - // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - ShutdownTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - ShutdownReason reason, - IKinesisProxy kinesisProxy, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long backoffTimeMillis, - GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, - ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager) { - this.shardInfo = shardInfo; - this.recordProcessor = recordProcessor; - this.recordProcessorCheckpointer = recordProcessorCheckpointer; - this.reason = reason; - this.kinesisProxy = kinesisProxy; - this.initialPositionInStream = initialPositionInStream; - this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; - this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; - this.leaseCoordinator = leaseCoordinator; - this.backoffTimeMillis = backoffTimeMillis; - this.getRecordsCache = getRecordsCache; - this.shardSyncer = shardSyncer; - this.shardSyncStrategy = shardSyncStrategy; - this.childShards = childShards; - this.leaseCleanupManager = leaseCleanupManager; - } - - /* - * Invokes RecordProcessor shutdown() API. - * (non-Javadoc) - * - * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() - */ - @Override - public TaskResult call() { - Exception exception; - - LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " - + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); - - try { - final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); - final Runnable leaseLostAction = () -> takeLeaseLostAction(); - - if (reason == ShutdownReason.TERMINATE) { - try { - takeShardEndAction(currentShardLease); - } catch (InvalidStateException e) { - // If InvalidStateException happens, it indicates we have a non recoverable error in short term. - // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. - LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + - "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); - dropLease(currentShardLease); - throwOnApplicationException(leaseLostAction); - } - } else { - throwOnApplicationException(leaseLostAction); - } - - LOG.debug("Shutting down retrieval strategy."); - getRecordsCache.shutdown(); - LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); - return new TaskResult(null); - } catch (Exception e) { - if (e instanceof CustomerApplicationException) { - LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); - } else { - LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); - } - - exception = e; - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug("Interrupted sleep", ie); - } - } - - return new TaskResult(exception); - } - - // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. - private void takeShardEndAction(KinesisClientLease currentShardLease) - throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { - // Create new lease for the child shards if they don't exist. - // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. - // This would happen when KinesisDataFetcher catches ResourceNotFound exception. - // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. - // This scenario could happen when customer deletes the stream while leaving the KCL application running. - if (currentShardLease == null) { - throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); - } - if (!CollectionUtils.isNullOrEmpty(childShards)) { - // If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information. - createLeasesForChildShardsIfNotExist(); - updateCurrentLeaseWithChildShards(currentShardLease); - } else { - LOG.warn("Shard " + shardInfo.getShardId() - + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); - } - // Checkpoint with SHARD_END sequence number. - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo); - if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { - boolean isSuccess = false; - try { - isSuccess = attemptShardEndCheckpointing(); - } finally { - // Check if either the shard end ddb persist is successful or - // if childshards is empty. When child shards is empty then either it is due to - // completed shard being reprocessed or we got RNF from service. - // For these cases enqueue the lease for deletion. - if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { - leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); - } - } - } - } - - private void takeLeaseLostAction() { - final ShutdownInput leaseLostShutdownInput = new ShutdownInput() - .withShutdownReason(ShutdownReason.ZOMBIE) - .withCheckpointer(recordProcessorCheckpointer); - recordProcessor.shutdown(leaseLostShutdownInput); - } - - private boolean attemptShardEndCheckpointing() - throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { - final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) - .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); - if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - // Call the recordProcessor to checkpoint with SHARD_END sequence number. - // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. - throwOnApplicationException(() -> applicationCheckpointAndVerification()); - } - return true; - } - - private void applicationCheckpointAndVerification() { - recordProcessorCheckpointer.setSequenceNumberAtShardEnd( - recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); - recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - final ShutdownInput shardEndShutdownInput = new ShutdownInput() - .withShutdownReason(ShutdownReason.TERMINATE) - .withCheckpointer(recordProcessorCheckpointer); - recordProcessor.shutdown(shardEndShutdownInput); - - boolean successfullyCheckpointedShardEnd = false; - - KinesisClientLease leaseFromDdb = null; - try { - leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()); - } catch (Exception e) { - LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e); - } - - if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) { - successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END); - final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); - if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) { - LOG.error("Shard " + shardInfo.getShardId() + - " : Checkpoint information mismatch between authoritative source and local cache. " + - "This does not affect the application flow, but cut a ticket to Kinesis when you see this. " + - "Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue); - } - } else { - LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb); - } - - if (!successfullyCheckpointedShardEnd) { - throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + - "See IRecordProcessor.shutdown javadocs for more information."); - } - } - - private void throwOnApplicationException(Runnable action) throws CustomerApplicationException { - try { - action.run(); - } catch (Exception e) { - throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); - } - } - - private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { - // For child shard resulted from merge of two parent shards, verify if both the parents are either present or - // not present in the lease table before creating the lease entry. - if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { - final ChildShard childShard = childShards.get(0); - final List parentLeaseKeys = childShard.getParentShards(); - - if (parentLeaseKeys.size() != 2) { - throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard - + " does not contain other parent information."); - } else { - boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == - Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); - if (!isValidLeaseTableState) { - if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { - throw new BlockedOnParentShardException( - "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard - + " has partial parent information in lease table. Hence deferring lease creation of child shard."); - } else { - throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard - + " has partial parent information in lease table."); - } - } - } - } - // Attempt create leases for child shards. - for (ChildShard childShard : childShards) { - final String leaseKey = childShard.getShardId(); - if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { - final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard); - leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate); - LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey()); - } - } - } - - /** - * Returns true for 1 in N probability. - */ - @VisibleForTesting - boolean isOneInNProbability(int n) { - Random r = new Random(); - return 1 == r.nextInt((n - 1) + 1) + 1; - } - - private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); - currentLease.setChildShardIds(childShardIds); - leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS); - LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); - } - - - /* - * (non-Javadoc) - * - * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType() - */ - @Override - public TaskType getTaskType() { - return taskType; - } - - @VisibleForTesting - ShutdownReason getReason() { - return reason; - } - - private void dropLease(KinesisClientLease currentShardLease) { - if (currentShardLease == null) { - LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly."); - return; - } - leaseCoordinator.dropLease(currentShardLease); - LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey()); - } -} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index fff6b71f..547f1d12 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -19,7 +19,7 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; /** * Used to capture stream configuration and pass it along. */ -class StreamConfig { +public class StreamConfig { private final IKinesisProxy streamProxy; private final int maxRecords; @@ -38,11 +38,11 @@ class StreamConfig { * @param initialPositionInStream Initial position in stream */ StreamConfig(IKinesisProxy proxy, - int maxRecords, - long idleTimeInMilliseconds, - boolean callProcessRecordsEvenForEmptyRecordList, - boolean validateSequenceNumberBeforeCheckpointing, - InitialPositionInStreamExtended initialPositionInStream) { + int maxRecords, + long idleTimeInMilliseconds, + boolean callProcessRecordsEvenForEmptyRecordList, + boolean validateSequenceNumberBeforeCheckpointing, + InitialPositionInStreamExtended initialPositionInStream) { this.streamProxy = proxy; this.maxRecords = maxRecords; this.idleTimeInMilliseconds = idleTimeInMilliseconds; @@ -54,7 +54,7 @@ class StreamConfig { /** * @return the streamProxy */ - IKinesisProxy getStreamProxy() { + public IKinesisProxy getStreamProxy() { return streamProxy; } @@ -82,14 +82,14 @@ class StreamConfig { /** * @return the initialPositionInStream */ - InitialPositionInStreamExtended getInitialPositionInStream() { + public InitialPositionInStreamExtended getInitialPositionInStream() { return initialPositionInStream; } /** * @return validateSequenceNumberBeforeCheckpointing */ - boolean shouldValidateSequenceNumberBeforeCheckpointing() { + public boolean shouldValidateSequenceNumberBeforeCheckpointing() { return validateSequenceNumberBeforeCheckpointing; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java index 5da33ac6..ac46be33 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java @@ -24,7 +24,7 @@ import lombok.NonNull; @Data public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { @NonNull - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; @Override public GetRecordsResult getRecords(final int maxRecords) { @@ -44,7 +44,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index a69ea6ca..375b3c13 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -137,7 +137,7 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -687,7 +687,7 @@ public class Worker implements Runnable { boolean foundCompletedShard = false; Set assignedShards = new HashSet<>(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { - ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); + IShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) { foundCompletedShard = true; } else { @@ -695,10 +695,8 @@ public class Worker implements Runnable { } assignedShards.add(shardInfo); } - // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); - wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { @@ -983,9 +981,9 @@ public class Worker implements Runnable { ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { + if (consumer == null || KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { // // CASE1: There is a race condition between retrieving the current assignments, and creating the // notification. If the a lease is lost in between these two points, we explicitly decrement the @@ -1007,7 +1005,7 @@ public class Worker implements Runnable { return shutdownComplete; } - ConcurrentMap getShardInfoShardConsumerMap() { + ConcurrentMap getShardInfoShardConsumerMap() { return shardInfoShardConsumerMap; } @@ -1107,8 +1105,8 @@ public class Worker implements Runnable { * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier // lease instance (and was shutdown). Don't need to create another @@ -1123,7 +1121,7 @@ public class Worker implements Runnable { return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + protected IShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { final IRecordProcessor recordProcessor = processorFactory.createProcessor(); final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -1134,7 +1132,11 @@ public class Worker implements Runnable { streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory); - return new ShardConsumer(shardInfo, + if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null + this.shardConsumerFactory = new KinesisShardConsumerFactory(); + } + + return shardConsumerFactory.createShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, @@ -1146,7 +1148,6 @@ public class Worker implements Runnable { metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index d19fc3ed..dfb44cf8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -114,7 +114,7 @@ public class LeaseCleanupManager { completedLeaseStopwatch.start(); garbageLeaseStopwatch.start(); deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); isRunning = true; } else { LOG.info("Lease cleanup thread already running, no need to start."); @@ -241,7 +241,7 @@ public class LeaseCleanupManager { if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { // throws ResourceNotFoundException wereChildShardsPresent = !CollectionUtils - .isNullOrEmpty(getChildShardsFromService(shardInfo)); + .isNullOrEmpty(getChildShardsFromService(shardInfo)); } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; @@ -296,7 +296,7 @@ public class LeaseCleanupManager { for (String childShardLeaseKey : childShardLeaseKeys) { final KinesisClientLease childShardLease = Optional.ofNullable( - leaseManager.getLease(childShardLeaseKey)) + leaseManager.getLease(childShardLeaseKey)) .orElseThrow(() -> new IllegalStateException( "Child lease " + childShardLeaseKey + " for completed shard not found in " + "lease table - not cleaning up lease " + lease)); @@ -406,4 +406,4 @@ public class LeaseCleanupManager { boolean cleanedUp; String failureMsg; } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 6a5e76b9..63cceb6e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -14,8 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,7 +50,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; public class ConsumerStatesTest { @Mock - private ShardConsumer consumer; + private KinesisShardConsumer consumer; @Mock private StreamConfig streamConfig; @Mock @@ -251,9 +251,9 @@ public class ConsumerStatesTest { equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer))); assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification))); - assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.successTransition(), equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), - equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), @@ -266,7 +266,7 @@ public class ConsumerStatesTest { @Test public void shutdownRequestCompleteStateTest() { - ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; + ConsumerState state = KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; assertThat(state.createTask(consumer), nullValue()); @@ -345,9 +345,9 @@ public class ConsumerStatesTest { verify(shutdownNotification, never()).shutdownComplete(); } - static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, + static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, String propertyName, Matcher matcher) { - return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); + return taskWith(KinesisShutdownTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher shutdownReqTask( diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index 11e274cb..2a3b2774 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -49,7 +49,7 @@ public class GracefulShutdownCoordinatorTest { @Mock private Callable contextCallable; @Mock - private ConcurrentMap shardInfoConsumerMap; + private ConcurrentMap shardInfoConsumerMap; @Test public void testAllShutdownCompletedAlready() throws Exception { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java index 779ba92f..016ab32e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java @@ -39,8 +39,8 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MAX_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MIN_HASH_KEY; import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; import static org.mockito.Mockito.when; @@ -52,10 +52,10 @@ public class PeriodicShardSyncManagerTest { public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; /** Manager for PERIODIC shard sync strategy */ - private PeriodicShardSyncManager periodicShardSyncManager; + private KinesisPeriodicShardSyncManager periodicShardSyncManager; /** Manager for SHARD_END shard sync strategy */ - private PeriodicShardSyncManager auditorPeriodicShardSyncManager; + private KinesisPeriodicShardSyncManager auditorPeriodicShardSyncManager; @Mock private LeaderDecider leaderDecider; @@ -70,10 +70,10 @@ public class PeriodicShardSyncManagerTest { @Before public void setup() { - periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + periodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); - auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + auditorPeriodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); } @@ -92,7 +92,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertTrue(PeriodicShardSyncManager + Assert.assertTrue(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -110,7 +110,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -128,7 +128,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -147,7 +147,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 7a5e7fd2..8936a28e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** - * Unit tests of {@link ShardConsumer}. + * Unit tests of {@link KinesisShardConsumer}. */ @RunWith(MockitoJUnitRunner.class) public class ShardConsumerTest { @@ -160,8 +160,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -175,19 +175,19 @@ public class ShardConsumerTest { config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } /** @@ -210,8 +210,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -226,21 +226,21 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } @Test @@ -258,8 +258,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -273,19 +273,19 @@ public class ShardConsumerTest { config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); } @@ -300,8 +300,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -324,10 +324,10 @@ public class ShardConsumerTest { when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); // Throw Error when IRecordProcessor.initialize() is invoked. @@ -335,7 +335,7 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -347,7 +347,7 @@ public class ShardConsumerTest { assertThat(e.getCause(), instanceOf(ExecutionException.class)); } Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -355,7 +355,7 @@ public class ShardConsumerTest { consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(2)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args @@ -363,11 +363,11 @@ public class ShardConsumerTest { // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } /** - * Test method for {@link ShardConsumer#consumeShard()} + * Test method for {@link KinesisShardConsumer#consumeShard()} */ @Test public final void testConsumeShard() throws Exception { @@ -420,8 +420,8 @@ public class ShardConsumerTest { any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -440,11 +440,11 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -454,7 +454,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -469,21 +469,21 @@ public class ShardConsumerTest { assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true)); Thread.sleep(50); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); verify(shutdownNotification).shutdownNotificationComplete(); assertThat(processor.isShutdownNotificationCalled(), equalTo(true)); consumer.consumeShard(); Thread.sleep(50); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); consumer.beginShutdown(); Thread.sleep(50L); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); consumer.consumeShard(); verify(shutdownNotification, atLeastOnce()).shutdownComplete(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); verify(getRecordsCache).shutdown(); @@ -524,8 +524,8 @@ public class ShardConsumerTest { when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(streamConfig.getStreamProxy()).thenReturn(streamProxy); - final ShardConsumer consumer = - new ShardConsumer(shardInfo, + final KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -544,21 +544,21 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(0)).getCheckpoint(); consumer.consumeShard(); // check on parent shards Thread.sleep(parentShardPollIntervalMillis * 2); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(1)).getCheckpoint(); consumer.notifyShutdownRequested(shutdownNotification); verify(shutdownNotification, times(0)).shutdownComplete(); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); Thread.sleep(50L); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.isShutdown(), is(true)); verify(shutdownNotification, times(1)).shutdownComplete(); consumer.beginShutdown(); @@ -583,7 +583,7 @@ public class ShardConsumerTest { } /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures a transient error thrown from the record * processor's shutdown method with reason zombie will be retried. */ @Test @@ -646,8 +646,8 @@ public class ShardConsumerTest { metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -667,11 +667,11 @@ public class ShardConsumerTest { shardSyncStrategy); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -681,7 +681,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -709,12 +709,12 @@ public class ShardConsumerTest { // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -732,7 +732,7 @@ public class ShardConsumerTest { /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown * reason TERMINATE when the shard end is reached. */ @Test @@ -795,8 +795,8 @@ public class ShardConsumerTest { metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -818,11 +818,11 @@ public class ShardConsumerTest { when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -832,7 +832,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -860,12 +860,12 @@ public class ShardConsumerTest { // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -881,7 +881,7 @@ public class ShardConsumerTest { } /** - * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. + * Test method for {@link KinesisShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @Test public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { @@ -938,8 +938,8 @@ public class ShardConsumerTest { any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -958,11 +958,11 @@ public class ShardConsumerTest { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -973,7 +973,7 @@ public class ShardConsumerTest { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -985,9 +985,9 @@ public class ShardConsumerTest { assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); @@ -1014,8 +1014,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1041,22 +1041,22 @@ public class ShardConsumerTest { when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } @Test @@ -1069,8 +1069,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1101,8 +1101,8 @@ public class ShardConsumerTest { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1144,7 +1144,7 @@ public class ShardConsumerTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = new ShardConsumer( + KinesisShardConsumer shardConsumer = new KinesisShardConsumer( shardInfo, streamConfig, checkpoint, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 053a8bf7..e8870d66 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -66,7 +66,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShutdownTask.RETRY_RANDOM_MAX_RANGE; /** * @@ -139,7 +139,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -148,7 +148,7 @@ public class ShutdownTaskTest { when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -171,7 +171,7 @@ public class ShutdownTaskTest { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenCreatingLeaseThrows() throws Exception { @@ -183,7 +183,7 @@ public class ShutdownTaskTest { final String exceptionMessage = "InvalidStateException is thrown."; when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage)); - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -226,7 +226,7 @@ public class ShutdownTaskTest { // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -252,7 +252,7 @@ public class ShutdownTaskTest { } // Make next attempt with complete parent info in lease table - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -290,7 +290,7 @@ public class ShutdownTaskTest { when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); for (int i = 0; i < 10; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -315,7 +315,7 @@ public class ShutdownTaskTest { verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -351,7 +351,7 @@ public class ShutdownTaskTest { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -385,7 +385,7 @@ public class ShutdownTaskTest { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(shardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(shardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -415,7 +415,7 @@ public class ShutdownTaskTest { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.ZOMBIE, @@ -438,12 +438,12 @@ public class ShutdownTaskTest { } /** - * Test method for {@link ShutdownTask#getTaskType()}. + * Test method for {@link KinesisShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ShutdownTask task = new ShutdownTask(null, null, null, null, + KinesisShutdownTask task = new KinesisShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 50a3aa9b..75d74186 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -186,7 +186,7 @@ public class WorkerTest { @Mock private IRecordProcessor v2RecordProcessor; @Mock - private ShardConsumer shardConsumer; + private IShardConsumer shardConsumer; @Mock private Future taskFuture; @Mock @@ -297,13 +297,13 @@ public class WorkerTest { KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertNotNull(consumer); - ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertSame(consumer, consumer2); ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken = new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer3 = + IShardConsumer consumer3 = worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory); Assert.assertNotNull(consumer3); Assert.assertNotSame(consumer3, consumer); @@ -419,10 +419,10 @@ public class WorkerTest { new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); - ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = + IShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); + IShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = worker.createOrGetShardConsumer(duplicateOfShardInfo1ButWithAnotherConcurrencyToken, streamletFactory); - ShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); + IShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); Set assignedShards = new HashSet(); assignedShards.add(shardInfo1); @@ -1219,11 +1219,11 @@ public class WorkerTest { false, shardPrioritization); - final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); + final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease); - final ShardConsumer completedShardConsumer = mock(ShardConsumer.class); + final KinesisShardConsumer completedShardConsumer = mock(KinesisShardConsumer.class); shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer); - when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); + when(completedShardConsumer.getCurrentState()).thenReturn(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); Callable callable = worker.createWorkerShutdownCallable(); assertThat(worker.hasGracefulShutdownStarted(), equalTo(false)); @@ -1338,11 +1338,11 @@ public class WorkerTest { verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); } @@ -1451,11 +1451,11 @@ public class WorkerTest { verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); @@ -2013,19 +2013,19 @@ public class WorkerTest { @Override protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) { return Condition.matched(item, mismatchDescription) - .and(new Condition.Step() { + .and(new Condition.Step() { @Override - public Condition apply(MetricsCollectingTaskDecorator value, + public Condition apply(MetricsCollectingTaskDecorator value, Description mismatch) { - if (!(value.getOther() instanceof ShutdownTask)) { + if (!(value.getOther() instanceof KinesisShutdownTask)) { mismatch.appendText("Wrapped task isn't a shutdown task"); return Condition.notMatched(); } - return Condition.matched((ShutdownTask) value.getOther(), mismatch); + return Condition.matched((KinesisShutdownTask) value.getOther(), mismatch); } - }).and(new Condition.Step() { + }).and(new Condition.Step() { @Override - public Condition apply(ShutdownTask value, Description mismatch) { + public Condition apply(KinesisShutdownTask value, Description mismatch) { return Condition.matched(value.getReason(), mismatch); } }).matching(matcher);