From 03c78fd15e4130eeacace1352b6063b0be1c710d Mon Sep 17 00:00:00 2001 From: Nicholas Gutierrez Date: Tue, 2 Aug 2022 13:25:57 -0700 Subject: [PATCH] Everything Else needed for DynamoDBStreamsKinesisAdapter update compatibility --- .../lib/worker/IDataFetcher.java | 23 + .../lib/worker/IShardConsumer.java | 25 + .../lib/worker/IShardConsumerFactory.java | 34 + .../lib/worker/KinesisConsumerStates.java | 635 ++++++++++++++++++ .../lib/worker/KinesisShardConsumer.java | 608 +++++++++++++++++ .../worker/KinesisShardConsumerFactory.java | 48 ++ .../lib/worker/KinesisShutdownTask.java | 337 ++++++++++ .../lib/worker/PeriodicShardSyncManager.java | 410 +++++++++++ .../clientlibrary/lib/worker/Worker.java | 117 ++-- .../worker/PeriodicShardSyncManagerTest.java | 20 +- 10 files changed, 2195 insertions(+), 62 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java new file mode 100644 index 00000000..29f088d1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java @@ -0,0 +1,23 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.ChildShard; + +import java.util.List; + +public interface IDataFetcher { + + DataFetcherResult getRecords(int maxRecords); + + void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); + + void restartIterator(); + + boolean isShardEndReached(); + + List getChildShards(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java new file mode 100644 index 00000000..ac2e31d1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java @@ -0,0 +1,25 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public interface IShardConsumer { + + boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist(); + + enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND + } + + boolean consumeShard(); + + boolean isShutdown(); + + ShutdownReason getShutdownReason(); + + boolean beginShutdown(); + + void notifyShutdownRequested(ShutdownNotification shutdownNotification); + + KinesisConsumerStates.ShardConsumerState getCurrentState(); + + boolean isShutdownRequested(); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java new file mode 100644 index 00000000..acb23018 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java @@ -0,0 +1,34 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public interface IShardConsumerFactory { + + /** + * Returns a shard consumer to be used for consuming a (assigned) shard. + * + * @return Returns a shard consumer object. + */ + IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java new file mode 100644 index 00000000..766d6fba --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java @@ -0,0 +1,635 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * Top level container for all the possible states a {@link KinesisShardConsumer} can be in. The logic for creation of tasks, + * and state transitions is contained within the {@link ConsumerState} objects. + * + *

State Diagram

+ * + *
+ *       +-------------------+
+ *       | Waiting on Parent |                               +------------------+
+ *  +----+       Shard       |                               |     Shutdown     |
+ *  |    |                   |          +--------------------+   Notification   |
+ *  |    +----------+--------+          |  Shutdown:         |     Requested    |
+ *  |               | Success           |   Requested        +-+-------+--------+
+ *  |               |                   |                      |       |
+ *  |        +------+-------------+     |                      |       | Shutdown:
+ *  |        |    Initializing    +-----+                      |       |  Requested
+ *  |        |                    |     |                      |       |
+ *  |        |                    +-----+-------+              |       |
+ *  |        +---------+----------+     |       | Shutdown:    | +-----+-------------+
+ *  |                  | Success        |       |  Terminated  | |     Shutdown      |
+ *  |                  |                |       |  Zombie      | |   Notification    +-------------+
+ *  |           +------+-------------+  |       |              | |     Complete      |             |
+ *  |           |     Processing     +--+       |              | ++-----------+------+             |
+ *  |       +---+                    |          |              |  |           |                    |
+ *  |       |   |                    +----------+              |  |           | Shutdown:          |
+ *  |       |   +------+-------------+          |              \  /           |  Requested         |
+ *  |       |          |                        |               \/            +--------------------+
+ *  |       |          |                        |               ||
+ *  |       | Success  |                        |               || Shutdown:
+ *  |       +----------+                        |               ||  Terminated
+ *  |                                           |               ||  Zombie
+ *  |                                           |               ||
+ *  |                                           |               ||
+ *  |                                           |           +---++--------------+
+ *  |                                           |           |   Shutting Down   |
+ *  |                                           +-----------+                   |
+ *  |                                                       |                   |
+ *  |                                                       +--------+----------+
+ *  |                                                                |
+ *  |                                                                | Shutdown:
+ *  |                                                                |  All Reasons
+ *  |                                                                |
+ *  |                                                                |
+ *  |      Shutdown:                                        +--------+----------+
+ *  |        All Reasons                                    |     Shutdown      |
+ *  +-------------------------------------------------------+     Complete      |
+ *                                                          |                   |
+ *                                                          +-------------------+
+ * 
+ */ +public class KinesisConsumerStates { + + /** + * Enumerates processing states when working on a shard. + */ + public enum ShardConsumerState { + // @formatter:off + WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), + INITIALIZING(new InitializingState()), + PROCESSING(new ProcessingState()), + SHUTDOWN_REQUESTED(new ShutdownNotificationState()), + SHUTTING_DOWN(new ShuttingDownState()), + SHUTDOWN_COMPLETE(new ShutdownCompleteState()); + //@formatter:on + + private final ConsumerState consumerState; + + ShardConsumerState(ConsumerState consumerState) { + this.consumerState = consumerState; + } + + public ConsumerState getConsumerState() { + return consumerState; + } + } + + + /** + * Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to + * do when a transition occurs. + * + */ + public interface ConsumerState { + /** + * Creates a new task for this state using the passed in consumer to build the task. If there is no task + * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the + * consumer during the execution of this method. + * + * @param consumer + * the consumer to use build the task, or execute state. + * @return a valid task for this state or null if there is no task required. + */ + ITask createTask(KinesisShardConsumer consumer); + + /** + * Provides the next state of the consumer upon success of the task return by + * {@link ConsumerState#createTask(KinesisShardConsumer)}. + * + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState successTransition(); + + /** + * Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent + * on the current state, and the shutdown reason. + * + * @param shutdownReason + * the reason that a shutdown was requested + * @return the next state that the consumer should transition to, this may be the same object as the current + * state. + */ + ConsumerState shutdownTransition(ShutdownReason shutdownReason); + + /** + * The type of task that {@link ConsumerState#createTask(KinesisShardConsumer)} would return. This is always a valid state + * even if createTask would return a null value. + * + * @return the type of task that this state represents. + */ + TaskType getTaskType(); + + /** + * An enumeration represent the type of this state. Different consumer states may return the same + * {@link ShardConsumerState}. + * + * @return the type of consumer state this represents. + */ + ShardConsumerState getState(); + + boolean isTerminal(); + + } + + /** + * The initial state that any {@link KinesisShardConsumer} should start in. + */ + static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); + + private static ConsumerState shutdownStateFor(ShutdownReason reason) { + switch (reason) { + case REQUESTED: + return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); + case TERMINATE: + case ZOMBIE: + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + default: + throw new IllegalArgumentException("Unknown reason: " + reason); + } + } + + /** + * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent + * shards have been completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Transition to the initializing state to allow the record processor to be initialized in preparation of + * processing.
+ *
Shutdown
+ *
+ *
+ *
All Reasons
+ *
Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be + * informed of the shutdown.
+ *
+ *
+ *
+ */ + static class BlockedOnParentState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), + consumer.getParentShardPollIntervalMillis()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.INITIALIZING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.BLOCK_ON_PARENT_SHARDS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.WAITING_ON_PARENT_SHARDS; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for initializing the record processor with the shard information. + *

Valid Transitions

+ *
+ *
Success
+ *
Transitions to the processing state which will begin to send records to the record processor
+ *
Shutdown
+ *
At this point the record processor has been initialized, but hasn't processed any records. This requires that + * the record processor be notified of the shutdown, even though there is almost no actions the record processor + * could take. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
+ *

+ * This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never + * makes an requests to Kinesis for records, so it can't reach the end of a shard. + *

+ *

+ * Transitions to the {@link ShuttingDownState} + *

+ *
+ *
+ *
+ *
+ */ + public static class InitializingState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return new InitializeTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.getStreamConfig(), + consumer.getGetRecordsCache()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.INITIALIZE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.INITIALIZING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor. + * While in this state the only way a transition will occur is if a shutdown has been triggered. + *

Valid Transitions

+ *
+ *
Success
+ *
Doesn't actually transition, but instead returns the same state
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end + * of the shard triggering a {@link ShutdownReason#TERMINATE}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Transitions to the {@link ShutdownNotificationState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ProcessingState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return new ProcessTask(consumer.getShardInfo(), + consumer.getStreamConfig(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getGetRecordsCache()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.PROCESSING.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.PROCESS; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.PROCESSING; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState(); + + /** + * This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a + * chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a + * shutdown on the {@link InitializingState} or {@link ProcessingState}. + * + *

Valid Transitions

+ *
+ *
Success
+ *
Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
+ *
Shutdown
+ *
At this point records are being retrieved, and processed. An explicit shutdown will allow the record + * processor one last chance to checkpoint, and then the {@link KinesisShardConsumer} will be held in an idle state. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return new ShutdownNotificationTask(consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification(), + consumer.getShardInfo()); + } + + @Override + public ConsumerState successTransition() { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason == ShutdownReason.REQUESTED) { + return SHUTDOWN_REQUEST_COMPLETION_STATE; + } + return shutdownReason.getShutdownState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Once the {@link ShutdownNotificationState} has been completed the {@link KinesisShardConsumer} must not re-enter any of the + * processing states. This state idles the {@link KinesisShardConsumer} until the worker triggers the final shutdown state. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownNotificationCompletionState} + *

+ *
+ *
Shutdown
+ *
At this point the {@link KinesisShardConsumer} has notified the record processor of the impending shutdown, and is + * waiting that notification. While waiting for the notification no further processing should occur on the + * {@link KinesisShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains + * {@link ShutdownNotificationCompletionState}
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShuttingDownState}
+ *
+ *
+ *
+ */ + static class ShutdownNotificationCompletionState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + if (shutdownReason != ShutdownReason.REQUESTED) { + return shutdownReason.getShutdownState(); + } + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_NOTIFICATION; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_REQUESTED; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This state is entered if the {@link KinesisShardConsumer} loses its lease, or reaches the end of the shard. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. + *

+ *

+ * Transitions to the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the record processor has processed the final shutdown indication, and depending on the shutdown + * reason taken the correct course of action. From this point on there should be no more interactions with the + * record processor or {@link KinesisShardConsumer}. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Transitions to {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Transitions to the {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShuttingDownState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + return new KinesisShutdownTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownReason(), + consumer.getStreamConfig().getStreamProxy(), + consumer.getStreamConfig().getInitialPositionInStream(), + consumer.isCleanupLeasesOfCompletedShards(), + consumer.isIgnoreUnexpectedChildShards(), + consumer.getLeaseCoordinator(), + consumer.getTaskBackoffTimeMillis(), + consumer.getGetRecordsCache(), consumer.getShardSyncer(), + consumer.getShardSyncStrategy(), consumer.getChildShards(), + consumer.getLeaseCleanupManager()); + } + + @Override + public ConsumerState successTransition() { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTTING_DOWN; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * This is the final state for the {@link KinesisShardConsumer}. This occurs once all shutdown activities are completed. + * + *

Valid Transitions

+ *
+ *
Success
+ *
+ *

+ * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. + *

+ *

+ * Remains in the {@link ShutdownCompleteState} + *

+ *
+ *
Shutdown
+ *
At this point the all shutdown activites are completed, and the {@link KinesisShardConsumer} should not take any + * further actions. + *
+ *
{@link ShutdownReason#REQUESTED}
+ *
+ *

+ * This should not occur as all other {@link ShutdownReason}s take priority over it. + *

+ *

+ * Remains in {@link ShutdownCompleteState} + *

+ *
+ *
{@link ShutdownReason#ZOMBIE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
{@link ShutdownReason#TERMINATE}
+ *
Remains in {@link ShutdownCompleteState}
+ *
+ *
+ *
+ */ + static class ShutdownCompleteState implements ConsumerState { + + @Override + public ITask createTask(KinesisShardConsumer consumer) { + if (consumer.getShutdownNotification() != null) { + consumer.getShutdownNotification().shutdownComplete(); + } + return null; + } + + @Override + public ConsumerState successTransition() { + return this; + } + + @Override + public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { + return this; + } + + @Override + public TaskType getTaskType() { + return TaskType.SHUTDOWN_COMPLETE; + } + + @Override + public ShardConsumerState getState() { + return ShardConsumerState.SHUTDOWN_COMPLETE; + } + + @Override + public boolean isTerminal() { + return true; + } + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java new file mode 100644 index 00000000..6039ad3b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java @@ -0,0 +1,608 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +/** + * Responsible for consuming data records of a (specified) shard. + * The instance should be shutdown when we lose the primary responsibility for a shard. + * A new instance should be created if the primary responsibility is reassigned back to this process. + */ +public class KinesisShardConsumer implements IShardConsumer{ + + private static final Log LOG = LogFactory.getLog(KinesisShardConsumer.class); + + private final StreamConfig streamConfig; + private final IRecordProcessor recordProcessor; + private final KinesisClientLibConfiguration config; + private final RecordProcessorCheckpointer recordProcessorCheckpointer; + private final ExecutorService executorService; + private final ShardInfo shardInfo; + private final KinesisDataFetcher dataFetcher; + private final IMetricsFactory metricsFactory; + private final KinesisClientLibLeaseCoordinator leaseCoordinator; + private ICheckpoint checkpoint; + private LeaseCleanupManager leaseCleanupManager; + // Backoff time when polling to check if application has finished processing parent shards + private final long parentShardPollIntervalMillis; + private final boolean cleanupLeasesOfCompletedShards; + private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + + //@Getter + private final ShardSyncer shardSyncer; + + private ITask currentTask; + private long currentTaskSubmitTime; + private Future future; + private ShardSyncStrategy shardSyncStrategy; + + //@Getter + private List childShards; + + //@Getter + private final GetRecordsCache getRecordsCache; + + public List getChildShards() { + return childShards; + } + + public GetRecordsCache getGetRecordsCache() { + return getRecordsCache; + } + + public ShardSyncer getShardSyncer() { + return shardSyncer; + } + + private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + ShardInfo shardInfo) { + Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> + maxGetRecordsThreadPool.map(max -> + new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); + + return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + } + + /* + * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do + * much coordination/synchronization to handle concurrent reads/updates. + */ + private KinesisConsumerStates.ConsumerState currentState = KinesisConsumerStates.INITIAL_STATE; + /* + * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. + * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. + */ + private volatile ShutdownReason shutdownReason; + private volatile ShutdownNotification shutdownNotification; + + /** + * @param shardInfo Shard information + * @param streamConfig Stream configuration to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration + * @param leaseCoordinator Used to manage leases for current worker + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param shardSyncer shardSyncer instance used to check and create new leases + */ + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + @Deprecated + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + + this(shardInfo, + streamConfig, + checkpoint, + recordProcessor, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional.empty(), + Optional.empty(), + config, shardSyncer, shardSyncStrategy); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream configuration to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param leaseCoordinator Used to manage leases for current worker + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. + * @param config Kinesis library configuration + * @param shardSyncer shardSyncer instance used to check and create new leases + */ + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + @Deprecated + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + this( + shardInfo, + streamConfig, + checkpoint, + recordProcessor, + new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), + metricsFactory), + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config, shardSyncer, shardSyncStrategy + ); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseCoordinator Used to manage leases for current worker + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + * @param shardSyncer shardSyncer instance used to check and create new leases + */ + @Deprecated + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + + this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, + maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), + config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), + config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseCoordinator Used to manage leases for current worker + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + * @param shardSyncer shardSyncer instance used to check and create new leases + * @param leaseCleanupManager used to clean up leases in lease table. + */ + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { + this.shardInfo = shardInfo; + this.streamConfig = streamConfig; + this.checkpoint = checkpoint; + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; + this.leaseCoordinator = leaseCoordinator; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.executorService = executorService; + this.metricsFactory = metricsFactory; + this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; + this.config = config; + this.dataFetcher = kinesisDataFetcher; + this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); + this.shardSyncer = shardSyncer; + this.shardSyncStrategy = shardSyncStrategy; + this.leaseCleanupManager = leaseCleanupManager; + } + + /** + * No-op if current task is pending, otherwise submits next task for this shard. + * This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state. + * + * @return true if a new process task was submitted, false otherwise + */ + public synchronized boolean consumeShard() { + return checkAndSubmitNextTask(); + } + + private boolean readyForNextTask() { + return future == null || future.isCancelled() || future.isDone(); + } + + private synchronized boolean checkAndSubmitNextTask() { + boolean submittedNewTask = false; + if (readyForNextTask()) { + TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE; + if (future != null && future.isDone()) { + taskOutcome = determineTaskOutcome(); + } + + updateState(taskOutcome); + ITask nextTask = getNextTask(); + if (nextTask != null) { + currentTask = nextTask; + try { + future = executorService.submit(currentTask); + currentTaskSubmitTime = System.currentTimeMillis(); + submittedNewTask = true; + LOG.debug("Submitted new " + currentTask.getTaskType() + + " task for shard " + shardInfo.getShardId()); + } catch (RejectedExecutionException e) { + LOG.info(currentTask.getTaskType() + " task was not accepted for execution.", e); + } catch (RuntimeException e) { + LOG.info(currentTask.getTaskType() + " task encountered exception ", e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("No new task to submit for shard %s, currentState %s", + shardInfo.getShardId(), + currentState.toString())); + } + } + } else { + final long timeElapsed = System.currentTimeMillis() - currentTaskSubmitTime; + final String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ", + currentTask.getTaskType(), shardInfo.getShardId(), timeElapsed); + if (LOG.isDebugEnabled()) { + LOG.debug(commonMessage + "Not submitting new task."); + } + config.getLogWarningForTaskAfterMillis().ifPresent(value -> { + if (timeElapsed > value) { + LOG.warn(commonMessage); + } + }); + } + + return submittedNewTask; + } + + public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + + /*public enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND + }*/ + + private TaskOutcome determineTaskOutcome() { + try { + TaskResult result = future.get(); + if (result.getException() == null) { + if (result.isShardEndReached()) { + if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) { + childShards = result.getChildShards(); + LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards); + } + return TaskOutcome.END_OF_SHARD; + } + return TaskOutcome.SUCCESSFUL; + } + logTaskException(result); + // This is the case of result with exception + if (result.isLeaseNotFound()) { + return TaskOutcome.LEASE_NOT_FOUND; + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // Setting future to null so we don't misinterpret task completion status in case of exceptions + future = null; + } + return TaskOutcome.FAILURE; + } + + private void logTaskException(TaskResult taskResult) { + if (LOG.isDebugEnabled()) { + Exception taskException = taskResult.getException(); + if (taskException instanceof BlockedOnParentShardException) { + // No need to log the stack trace for this exception (it is very specific). + LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard."); + } else { + LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ", + taskResult.getException()); + } + } + } + + /** + * Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint + * before being shutdown. + * + * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. + */ + public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + this.shutdownNotification = shutdownNotification; + markForShutdown(ShutdownReason.REQUESTED); + } + + /** + * Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API). + * This is called by Worker when it loses responsibility for a shard. + * + * @return true if shutdown is complete (false if shutdown is still in progress) + */ + public synchronized boolean beginShutdown() { + markForShutdown(ShutdownReason.ZOMBIE); + checkAndSubmitNextTask(); + + return isShutdown(); + } + + synchronized void markForShutdown(ShutdownReason reason) { + // ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard) + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { + shutdownReason = reason; + } + } + + /** + * Used (by Worker) to check if this ShardConsumer instance has been shutdown + * RecordProcessor shutdown() has been invoked, as appropriate. + * + * @return true if shutdown is complete + */ + public boolean isShutdown() { + return currentState.isTerminal(); + } + + /** + * @return the shutdownReason + */ + public ShutdownReason getShutdownReason() { + return shutdownReason; + } + + /** + * Figure out next task to run based on current state, task, and shutdown context. + * + * @return Return next task to run + */ + private ITask getNextTask() { + ITask nextTask = currentState.createTask(this); + + if (nextTask == null) { + return null; + } else { + return new MetricsCollectingTaskDecorator(nextTask, metricsFactory); + } + } + + /** + * Note: This is a private/internal method with package level access solely for testing purposes. + * Update state based on information about: task success, current state, and shutdown info. + * + * @param taskOutcome The outcome of the last task + */ + void updateState(TaskOutcome taskOutcome) { + if (taskOutcome == TaskOutcome.END_OF_SHARD) { + markForShutdown(ShutdownReason.TERMINATE); + LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); + } + if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) { + markForShutdown(ShutdownReason.ZOMBIE); + LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found"); + } + if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { + currentState = currentState.shutdownTransition(shutdownReason); + } else if (isShutdownRequested() && KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { + currentState = currentState.shutdownTransition(shutdownReason); + } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { + if (currentState.getTaskType() == currentTask.getTaskType()) { + currentState = currentState.successTransition(); + } else { + LOG.error("Current State task type of '" + currentState.getTaskType() + + "' doesn't match the current tasks type of '" + currentTask.getTaskType() + + "'. This shouldn't happen, and indicates a programming error. " + + "Unable to safely transition to the next state."); + } + } + // + // Don't change state otherwise + // + + } + + @VisibleForTesting + public boolean isShutdownRequested() { + return shutdownReason != null; + } + + /** + * Private/Internal method - has package level access solely for testing purposes. + * + * @return the currentState + */ + public KinesisConsumerStates.ShardConsumerState getCurrentState() { + return currentState.getState(); + } + + StreamConfig getStreamConfig() { + return streamConfig; + } + + IRecordProcessor getRecordProcessor() { + return recordProcessor; + } + + RecordProcessorCheckpointer getRecordProcessorCheckpointer() { + return recordProcessorCheckpointer; + } + + ExecutorService getExecutorService() { + return executorService; + } + + ShardInfo getShardInfo() { + return shardInfo; + } + + KinesisDataFetcher getDataFetcher() { + return dataFetcher; + } + + ILeaseManager getLeaseManager() { + return leaseCoordinator.getLeaseManager(); + } + + KinesisClientLibLeaseCoordinator getLeaseCoordinator() { + return leaseCoordinator; + } + + ICheckpoint getCheckpoint() { + return checkpoint; + } + + long getParentShardPollIntervalMillis() { + return parentShardPollIntervalMillis; + } + + boolean isCleanupLeasesOfCompletedShards() { + return cleanupLeasesOfCompletedShards; + } + + boolean isIgnoreUnexpectedChildShards() { + return config.shouldIgnoreUnexpectedChildShards(); + } + + long getTaskBackoffTimeMillis() { + return taskBackoffTimeMillis; + } + + Future getFuture() { + return future; + } + + ShutdownNotification getShutdownNotification() { + return shutdownNotification; + } + + ShardSyncStrategy getShardSyncStrategy() { + return shardSyncStrategy; + } + + LeaseCleanupManager getLeaseCleanupManager() { + return leaseCleanupManager; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java new file mode 100644 index 00000000..bbcae852 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public class KinesisShardConsumerFactory implements IShardConsumerFactory{ + + @Override + public IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { + return new KinesisShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + recordProcessorCheckpointer, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config, shardSyncer, shardSyncStrategy, + leaseCleanupManager); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java new file mode 100644 index 00000000..85c4aa7e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java @@ -0,0 +1,337 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; +import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Task for invoking the RecordProcessor shutdown() callback. + */ +public class KinesisShutdownTask implements ITask { + + private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class); + + @VisibleForTesting + static final int RETRY_RANDOM_MAX_RANGE = 50; + + private final ShardInfo shardInfo; + private final IRecordProcessor recordProcessor; + private final RecordProcessorCheckpointer recordProcessorCheckpointer; + private final ShutdownReason reason; + private final IKinesisProxy kinesisProxy; + private final KinesisClientLibLeaseCoordinator leaseCoordinator; + private final InitialPositionInStreamExtended initialPositionInStream; + private final boolean cleanupLeasesOfCompletedShards; + private final boolean ignoreUnexpectedChildShards; + private final TaskType taskType = TaskType.SHUTDOWN; + private final long backoffTimeMillis; + private final GetRecordsCache getRecordsCache; + private final ShardSyncer shardSyncer; + private final ShardSyncStrategy shardSyncStrategy; + private final List childShards; + private final LeaseCleanupManager leaseCleanupManager; + + /** + * Constructor. + */ + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + KinesisShutdownTask(ShardInfo shardInfo, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ShutdownReason reason, + IKinesisProxy kinesisProxy, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long backoffTimeMillis, + GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, + ShardSyncStrategy shardSyncStrategy, List childShards, + LeaseCleanupManager leaseCleanupManager) { + this.shardInfo = shardInfo; + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; + this.reason = reason; + this.kinesisProxy = kinesisProxy; + this.initialPositionInStream = initialPositionInStream; + this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; + this.leaseCoordinator = leaseCoordinator; + this.backoffTimeMillis = backoffTimeMillis; + this.getRecordsCache = getRecordsCache; + this.shardSyncer = shardSyncer; + this.shardSyncStrategy = shardSyncStrategy; + this.childShards = childShards; + this.leaseCleanupManager = leaseCleanupManager; + } + + /* + * Invokes RecordProcessor shutdown() API. + * (non-Javadoc) + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() + */ + @Override + public TaskResult call() { + Exception exception; + + LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: " + + shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards); + + try { + final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final Runnable leaseLostAction = () -> takeLeaseLostAction(); + + if (reason == ShutdownReason.TERMINATE) { + try { + takeShardEndAction(currentShardLease); + } catch (InvalidStateException e) { + // If InvalidStateException happens, it indicates we have a non recoverable error in short term. + // In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down. + LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " + + "Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e); + dropLease(currentShardLease); + throwOnApplicationException(leaseLostAction); + } + } else { + throwOnApplicationException(leaseLostAction); + } + + LOG.debug("Shutting down retrieval strategy."); + getRecordsCache.shutdown(); + LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); + return new TaskResult(null); + } catch (Exception e) { + if (e instanceof CustomerApplicationException) { + LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e); + } else { + LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e); + } + + exception = e; + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug("Interrupted sleep", ie); + } + } + + return new TaskResult(exception); + } + + // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. + private void takeShardEndAction(KinesisClientLease currentShardLease) + throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException { + // Create new lease for the child shards if they don't exist. + // We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards. + // This would happen when KinesisDataFetcher catches ResourceNotFound exception. + // In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. + // This scenario could happen when customer deletes the stream while leaving the KCL application running. + if (currentShardLease == null) { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + } + if (!CollectionUtils.isNullOrEmpty(childShards)) { + // If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information. + createLeasesForChildShardsIfNotExist(); + updateCurrentLeaseWithChildShards(currentShardLease); + } else { + LOG.warn("Shard " + shardInfo.getShardId() + + ": Shutting down consumer with SHARD_END reason without creating leases for child shards."); + } + // Checkpoint with SHARD_END sequence number. + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo); + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + boolean isSuccess = false; + try { + isSuccess = attemptShardEndCheckpointing(); + } finally { + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + } + } + } + + private void takeLeaseLostAction() { + final ShutdownInput leaseLostShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.ZOMBIE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(leaseLostShutdownInput); + } + + private boolean attemptShardEndCheckpointing() + throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { + final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId())) + .orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist.")); + if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + // Call the recordProcessor to checkpoint with SHARD_END sequence number. + // The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown. + throwOnApplicationException(() -> applicationCheckpointAndVerification()); + } + return true; + } + + private void applicationCheckpointAndVerification() { + recordProcessorCheckpointer.setSequenceNumberAtShardEnd( + recordProcessorCheckpointer.getLargestPermittedCheckpointValue()); + recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + final ShutdownInput shardEndShutdownInput = new ShutdownInput() + .withShutdownReason(ShutdownReason.TERMINATE) + .withCheckpointer(recordProcessorCheckpointer); + recordProcessor.shutdown(shardEndShutdownInput); + + boolean successfullyCheckpointedShardEnd = false; + + KinesisClientLease leaseFromDdb = null; + try { + leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()); + } catch (Exception e) { + LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e); + } + + if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) { + successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END); + final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) { + LOG.error("Shard " + shardInfo.getShardId() + + " : Checkpoint information mismatch between authoritative source and local cache. " + + "This does not affect the application flow, but cut a ticket to Kinesis when you see this. " + + "Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue); + } + } else { + LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb); + } + + if (!successfullyCheckpointedShardEnd) { + throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + + "See IRecordProcessor.shutdown javadocs for more information."); + } + } + + private void throwOnApplicationException(Runnable action) throws CustomerApplicationException { + try { + action.run(); + } catch (Exception e) { + throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e); + } + } + + private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException { + // For child shard resulted from merge of two parent shards, verify if both the parents are either present or + // not present in the lease table before creating the lease entry. + if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { + final ChildShard childShard = childShards.get(0); + final List parentLeaseKeys = childShard.getParentShards(); + + if (parentLeaseKeys.size() != 2) { + throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard + + " does not contain other parent information."); + } else { + boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == + Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); + if (!isValidLeaseTableState) { + if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { + throw new BlockedOnParentShardException( + "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table. Hence deferring lease creation of child shard."); + } else { + throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + + " has partial parent information in lease table."); + } + } + } + } + // Attempt create leases for child shards. + for (ChildShard childShard : childShards) { + final String leaseKey = childShard.getShardId(); + if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) { + final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard); + leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate); + LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey()); + } + } + } + + /** + * Returns true for 1 in N probability. + */ + @VisibleForTesting + boolean isOneInNProbability(int n) { + Random r = new Random(); + return 1 == r.nextInt((n - 1) + 1) + 1; + } + + private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); + currentLease.setChildShardIds(childShardIds); + leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS); + LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey()); + } + + + /* + * (non-Javadoc) + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType() + */ + @Override + public TaskType getTaskType() { + return taskType; + } + + @VisibleForTesting + ShutdownReason getReason() { + return reason; + } + + private void dropLease(KinesisClientLease currentShardLease) { + if (currentShardLease == null) { + LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly."); + return; + } + leaseCoordinator.dropLease(currentShardLease); + LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey()); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java new file mode 100644 index 00000000..4e0e8434 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -0,0 +1,410 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.amazonaws.services.cloudwatch.model.StandardUnit; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import com.amazonaws.services.kinesis.leases.impl.UpdateField; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.Validate; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange; + +/** + * The top level orchestrator for coordinating the periodic shard sync related activities. If the configured + * {@link ShardSyncStrategyType} is PERIODIC, this class will be the main shard sync orchestrator. For non-PERIODIC + * strategies, this class will serve as an internal auditor that periodically checks if the full hash range is covered + * by currently held leases, and initiates a recovery shard sync if not. + */ +@Getter +@EqualsAndHashCode +class PeriodicShardSyncManager { + private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class); + private static final long INITIAL_DELAY = 0; + + /** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */ + private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L; + + /** Parameters for validating hash range completeness when running in auditor mode. */ + @VisibleForTesting + static final BigInteger MIN_HASH_KEY = BigInteger.ZERO; + @VisibleForTesting + static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE); + static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager"; + private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker(); + + private final String workerId; + private final LeaderDecider leaderDecider; + private final ITask metricsEmittingShardSyncTask; + private final ScheduledExecutorService shardSyncThreadPool; + private final ILeaseManager leaseManager; + private final IKinesisProxy kinesisProxy; + private final boolean isAuditorMode; + private final long periodicShardSyncIntervalMillis; + private boolean isRunning; + private final IMetricsFactory metricsFactory; + private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; + + + PeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, + leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, + leasesRecoveryAuditorInconsistencyConfidenceThreshold); + } + + PeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + ScheduledExecutorService shardSyncThreadPool, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); + Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); + Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); + this.workerId = workerId; + this.leaderDecider = leaderDecider; + this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory); + this.shardSyncThreadPool = shardSyncThreadPool; + this.leaseManager = leaseManager; + this.kinesisProxy = kinesisProxy; + this.metricsFactory = metricsFactory; + this.isAuditorMode = isAuditorMode; + this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold; + if (isAuditorMode) { + Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies."); + Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies."); + this.periodicShardSyncIntervalMillis = leasesRecoveryAuditorExecutionFrequencyMillis; + } else { + this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS; + } + } + + public synchronized TaskResult start() { + if (!isRunning) { + final Runnable periodicShardSyncer = () -> { + try { + runShardSync(); + } catch (Throwable t) { + LOG.error("Error running shard sync.", t); + } + }; + + shardSyncThreadPool + .scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, periodicShardSyncIntervalMillis, + TimeUnit.MILLISECONDS); + isRunning = true; + } + return new TaskResult(null); + } + + /** + * Runs ShardSync once, without scheduling further periodic ShardSyncs. + * @return TaskResult from shard sync + */ + public synchronized TaskResult syncShardsOnce() { + LOG.info("Syncing shards once from worker " + workerId); + return metricsEmittingShardSyncTask.call(); + } + + public void stop() { + if (isRunning) { + LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); + leaderDecider.shutdown(); + LOG.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId)); + shardSyncThreadPool.shutdown(); + isRunning = false; + } + } + + private void runShardSync() { + if (leaderDecider.isLeader(workerId)) { + LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task"); + + MetricsHelper.startScope(metricsFactory, PERIODIC_SHARD_SYNC_MANAGER); + boolean isRunSuccess = false; + final long runStartMillis = System.currentTimeMillis(); + + try { + final ShardSyncResponse shardSyncResponse = checkForShardSync(); + MetricsHelper.getMetricsScope().addData("NumStreamsToSync", shardSyncResponse.shouldDoShardSync() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY); + MetricsHelper.getMetricsScope().addData("NumStreamsWithPartialLeases", shardSyncResponse.isHoleDetected() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY); + if (shardSyncResponse.shouldDoShardSync()) { + LOG.info("Periodic shard syncer initiating shard sync due to the reason - " + + shardSyncResponse.reasonForDecision()); + metricsEmittingShardSyncTask.call(); + } else { + LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision()); + } + isRunSuccess = true; + } catch (Exception e) { + LOG.error("Caught exception while running periodic shard syncer.", e); + } finally { + MetricsHelper.addSuccessAndLatency(runStartMillis, isRunSuccess, MetricsLevel.SUMMARY); + MetricsHelper.endScope(); + } + } else { + LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task"); + } + } + + @VisibleForTesting + ShardSyncResponse checkForShardSync() throws DependencyException, InvalidStateException, + ProvisionedThroughputException { + + if (!isAuditorMode) { + // If we are running with PERIODIC shard sync strategy, we should sync every time. + return new ShardSyncResponse(true, false, "Syncing every time with PERIODIC shard sync strategy."); + } + + // Get current leases from DynamoDB. + final List currentLeases = leaseManager.listLeases(); + + if (CollectionUtils.isNullOrEmpty(currentLeases)) { + // If the current leases are null or empty, then we need to initiate a shard sync. + LOG.info("No leases found. Will trigger a shard sync."); + return new ShardSyncResponse(true, false, "No leases found."); + } + + // Check if there are any holes in the hash range covered by current leases. Return the first hole if present. + Optional hashRangeHoleOpt = hasHoleInLeases(currentLeases); + if (hashRangeHoleOpt.isPresent()) { + // If hole is present, check if the hole is detected consecutively in previous occurrences. If hole is + // determined with high confidence, return true; return false otherwise. We use the high confidence factor + // to avoid shard sync on any holes during resharding and lease cleanups, or other intermittent issues. + final boolean hasHoleWithHighConfidence = + hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get()); + + return new ShardSyncResponse(hasHoleWithHighConfidence, true, + "Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " + + "Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold); + } else { + // If hole is not present, clear any previous hole tracking and return false. + hashRangeHoleTracker.reset(); + return new ShardSyncResponse(false, false, "Hash range is complete."); + } + } + + @VisibleForTesting + Optional hasHoleInLeases(List leases) { + // Filter out any leases with checkpoints other than SHARD_END + final List activeLeases = leases.stream() + .filter(lease -> lease.getCheckpoint() != null && !lease.getCheckpoint().isShardEnd()) + .collect(Collectors.toList()); + + final List activeLeasesWithHashRanges = fillWithHashRangesIfRequired(activeLeases); + return checkForHoleInHashKeyRanges(activeLeasesWithHashRanges); + } + + private List fillWithHashRangesIfRequired(List activeLeases) { + final List activeLeasesWithNoHashRanges = activeLeases.stream() + .filter(lease -> lease.getHashKeyRange() == null).collect(Collectors.toList()); + + if (activeLeasesWithNoHashRanges.isEmpty()) { + return activeLeases; + } + + // Fetch shards from Kinesis to fill in the in-memory hash ranges + final Map kinesisShards = kinesisProxy.getShardList().stream() + .collect(Collectors.toMap(Shard::getShardId, shard -> shard)); + + return activeLeases.stream().map(lease -> { + if (lease.getHashKeyRange() == null) { + final String shardId = lease.getLeaseKey(); + final Shard shard = kinesisShards.get(shardId); + if (shard == null) { + return lease; + } + lease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange())); + + try { + leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE); + } catch (Exception e) { + LOG.warn("Unable to update hash range information for lease " + lease.getLeaseKey() + + ". This may result in explicit lease sync."); + } + } + return lease; + }).filter(lease -> lease.getHashKeyRange() != null).collect(Collectors.toList()); + } + + @VisibleForTesting + static Optional checkForHoleInHashKeyRanges(List leasesWithHashKeyRanges) { + // Sort the hash ranges by starting hash key + final List sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges); + if (sortedLeasesWithHashKeyRanges.isEmpty()) { + LOG.error("No leases with valid hash ranges found."); + return Optional.of(new HashRangeHole()); + } + + // Validate the hash range bounds + final KinesisClientLease minHashKeyLease = sortedLeasesWithHashKeyRanges.get(0); + final KinesisClientLease maxHashKeyLease = + sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1); + if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) || + !maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) { + LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease); + return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange())); + } + + // Check for any holes in the sorted hash range intervals + if (sortedLeasesWithHashKeyRanges.size() > 1) { + KinesisClientLease leftmostLeaseToReportInCaseOfHole = minHashKeyLease; + HashKeyRangeForLease leftLeaseHashRange = leftmostLeaseToReportInCaseOfHole.getHashKeyRange(); + + for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) { + final KinesisClientLease rightLease = sortedLeasesWithHashKeyRanges.get(i); + final HashKeyRangeForLease rightLeaseHashRange = rightLease.getHashKeyRange(); + final BigInteger rangeDiff = + rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey()); + // We have overlapping leases when rangeDiff is 0 or negative. + // signum() will be -1 for negative and 0 if value is 0. + // Merge the ranges for further tracking. + if (rangeDiff.signum() <= 0) { + leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(), + leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey())); + } else { + // We have non-overlapping leases when rangeDiff is positive. signum() will be 1 in this case. + // If rangeDiff is 1, then it is a continuous hash range. If not, there is a hole. + if (!rangeDiff.equals(BigInteger.ONE)) { + LOG.error("Incomplete hash range found between " + leftmostLeaseToReportInCaseOfHole + + " and " + rightLease); + return Optional.of(new HashRangeHole(leftmostLeaseToReportInCaseOfHole.getHashKeyRange(), + rightLease.getHashKeyRange())); + } + + leftmostLeaseToReportInCaseOfHole = rightLease; + leftLeaseHashRange = rightLeaseHashRange; + } + } + } + + return Optional.empty(); + } + + @VisibleForTesting + static List sortLeasesByHashRange(List leasesWithHashKeyRanges) { + if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) { + return leasesWithHashKeyRanges; + } + Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); + return leasesWithHashKeyRanges; + } + + @Value + @Accessors(fluent = true) + @VisibleForTesting + static class ShardSyncResponse { + private final boolean shouldDoShardSync; + private final boolean isHoleDetected; + private final String reasonForDecision; + } + + @Value + private static class HashRangeHole { + private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; + private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole; + + HashRangeHole() { + hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null; + } + + HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole, + HashKeyRangeForLease hashRangeAtEndOfPossibleHole) { + this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole; + this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole; + } + } + + private class HashRangeHoleTracker { + private HashRangeHole hashRangeHole; + @Getter + private Integer numConsecutiveHoles; + + public boolean hashHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) { + if (hashRangeHole.equals(this.hashRangeHole)) { + ++this.numConsecutiveHoles; + } else { + this.hashRangeHole = hashRangeHole; + this.numConsecutiveHoles = 1; + } + + return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold; + } + + public void reset() { + this.hashRangeHole = null; + this.numConsecutiveHoles = 0; + } + } + + private static class HashKeyRangeComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(KinesisClientLease lease, KinesisClientLease otherLease) { + Validate.notNull(lease); + Validate.notNull(otherLease); + Validate.notNull(lease.getHashKeyRange()); + Validate.notNull(otherLease.getHashKeyRange()); + return ComparisonChain.start() + .compare(lease.getHashKeyRange().startingHashKey(), otherLease.getHashKeyRange().startingHashKey()) + .compare(lease.getHashKeyRange().endingHashKey(), otherLease.getHashKeyRange().endingHashKey()) + .result(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 375b3c13..6a98962f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -160,6 +160,9 @@ public class Worker implements Runnable { private final LeaseCleanupManager leaseCleanupManager; + // Shard Consumer Factory + private IShardConsumerFactory shardConsumerFactory; + /** * Constructor. * @@ -470,11 +473,11 @@ public class Worker implements Runnable { // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, @@ -527,30 +530,30 @@ public class Worker implements Runnable { // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), - leaderDecider, periodicShardSyncManager); + leaderDecider, periodicShardSyncManager, null /*Ishardconsumer*/); } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, - long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, - ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, - WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, - PeriodicShardSyncManager periodicShardSyncManager) { + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, + ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, + WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, + PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -580,6 +583,7 @@ public class Worker implements Runnable { Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); + this.shardConsumerFactory = shardConsumerFactory; } /** @@ -602,7 +606,7 @@ public class Worker implements Runnable { default: if (leaderDecider != null) { LOG.warn("LeaderDecider cannot be customized with non-PERIODIC shard sync strategy type. Using " + - "default LeaderDecider."); + "default LeaderDecider."); } this.leaderDecider = getOrCreateLeaderDecider(null); this.leaderElectedPeriodicShardSyncManager = @@ -614,7 +618,7 @@ public class Worker implements Runnable { } private static KinesisClientLibLeaseCoordinator getLeaseCoordinator(KinesisClientLibConfiguration config, - AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { + AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory) { return new KinesisClientLibLeaseCoordinator( new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()), DEFAULT_LEASE_SELECTOR, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), @@ -695,8 +699,10 @@ public class Worker implements Runnable { } assignedShards.add(shardInfo); } + // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); + wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { @@ -1132,8 +1138,9 @@ public class Worker implements Runnable { streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory); - if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null - this.shardConsumerFactory = new KinesisShardConsumerFactory(); + if(shardConsumerFactory == null){ + + shardConsumerFactory = new KinesisShardConsumerFactory(); } return shardConsumerFactory.createShardConsumer(shardInfo, @@ -1225,8 +1232,8 @@ public class Worker implements Runnable { * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, - KinesisClientLibConfiguration config) { + public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -1280,27 +1287,27 @@ public class Worker implements Runnable { /** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the * PeriodicShardSyncManager for the first time here. */ private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, - boolean isAuditorMode) { + boolean isAuditorMode) { if (periodicShardSyncManager != null) { return periodicShardSyncManager; } return new PeriodicShardSyncManager(config.getWorkerIdentifier(), - leaderDecider, - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - config.getInitialPositionInStreamExtended(), - config.shouldCleanupLeasesUponShardCompletion(), - config.shouldIgnoreUnexpectedChildShards(), - SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, - shardSyncer, - null), - metricsFactory, - leaseCoordinator.getLeaseManager(), - streamConfig.getStreamProxy(), - isAuditorMode, - config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), - config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); + leaderDecider, + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + config.getInitialPositionInStreamExtended(), + config.shouldCleanupLeasesUponShardCompletion(), + config.shouldIgnoreUnexpectedChildShards(), + SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC, + shardSyncer, + null), + metricsFactory, + leaseCoordinator.getLeaseManager(), + streamConfig.getStreamProxy(), + isAuditorMode, + config.getLeasesRecoveryAuditorExecutionFrequencyMillis(), + config.getLeasesRecoveryAuditorInconsistencyConfidenceThreshold()); } /** @@ -1310,7 +1317,7 @@ public class Worker implements Runnable { static class WorkerCWMetricsFactory extends CWMetricsFactory { WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, - int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { + int maxQueueSize, MetricsLevel metricsLevel, Set metricsEnabledDimensions) { super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); } } @@ -1354,6 +1361,8 @@ public class Worker implements Runnable { @Setter @Accessors(fluent = true) private IKinesisProxy kinesisProxy; @Setter @Accessors(fluent = true) + private IShardConsumerFactory shardConsumerFactory; + @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; @Setter @Accessors(fluent = true) private LeaseCleanupValidator leaseCleanupValidator; @@ -1422,6 +1431,10 @@ public class Worker implements Runnable { throw new IllegalArgumentException( "Kinesis Client Library configuration needs to be provided to build Worker"); } + if(shardConsumerFactory == null){ + shardConsumerFactory = new KinesisShardConsumerFactory(); + } + if (recordProcessorFactory == null) { throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } @@ -1504,16 +1517,15 @@ public class Worker implements Runnable { } // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). - if (leaseRenewer == null) { + if (leaseRenewer == null) { ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); } if (leaderDecider == null) { leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, - Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); + Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } - return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1547,14 +1559,15 @@ public class Worker implements Runnable { workerStateChangeListener, shardSyncer, leaderDecider, - null /* PeriodicShardSyncManager */); + null /*PeriodicShardSyncManager*/, + shardConsumerFactory); } > R createClient(final T builder, - final AWSCredentialsProvider credentialsProvider, - final ClientConfiguration clientConfiguration, - final String endpointUrl, - final String region) { + final AWSCredentialsProvider credentialsProvider, + final ClientConfiguration clientConfiguration, + final String endpointUrl, + final String region) { if (credentialsProvider != null) { builder.withCredentials(credentialsProvider); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java index 016ab32e..779ba92f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java @@ -39,8 +39,8 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MAX_HASH_KEY; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MIN_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY; import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; import static org.mockito.Mockito.when; @@ -52,10 +52,10 @@ public class PeriodicShardSyncManagerTest { public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; /** Manager for PERIODIC shard sync strategy */ - private KinesisPeriodicShardSyncManager periodicShardSyncManager; + private PeriodicShardSyncManager periodicShardSyncManager; /** Manager for SHARD_END shard sync strategy */ - private KinesisPeriodicShardSyncManager auditorPeriodicShardSyncManager; + private PeriodicShardSyncManager auditorPeriodicShardSyncManager; @Mock private LeaderDecider leaderDecider; @@ -70,10 +70,10 @@ public class PeriodicShardSyncManagerTest { @Before public void setup() { - periodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); - auditorPeriodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); } @@ -92,7 +92,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertTrue(KinesisPeriodicShardSyncManager + Assert.assertTrue(PeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -110,7 +110,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(KinesisPeriodicShardSyncManager + Assert.assertFalse(PeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -128,7 +128,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(KinesisPeriodicShardSyncManager + Assert.assertFalse(PeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -147,7 +147,7 @@ public class PeriodicShardSyncManagerTest { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(KinesisPeriodicShardSyncManager + Assert.assertFalse(PeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); }