Everything Else needed for DynamoDBStreamsKinesisAdapter update compatibility

This commit is contained in:
Nicholas Gutierrez 2022-08-02 13:25:57 -07:00
parent d33fcce5f1
commit 03c78fd15e
10 changed files with 2195 additions and 62 deletions

View file

@ -0,0 +1,23 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.ChildShard;
import java.util.List;
public interface IDataFetcher {
DataFetcherResult getRecords(int maxRecords);
void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);
void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
void restartIterator();
boolean isShardEndReached();
List<ChildShard> getChildShards();
}

View file

@ -0,0 +1,25 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
public interface IShardConsumer {
boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist();
enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}
boolean consumeShard();
boolean isShutdown();
ShutdownReason getShutdownReason();
boolean beginShutdown();
void notifyShutdownRequested(ShutdownNotification shutdownNotification);
KinesisConsumerStates.ShardConsumerState getCurrentState();
boolean isShutdownRequested();
}

View file

@ -0,0 +1,34 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
public interface IShardConsumerFactory {
/**
* Returns a shard consumer to be used for consuming a (assigned) shard.
*
* @return Returns a shard consumer object.
*/
IShardConsumer createShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpointTracker,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesUponShardCompletion,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager);
}

View file

@ -0,0 +1,635 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Top level container for all the possible states a {@link KinesisShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects.
*
* <h2>State Diagram</h2>
*
* <pre>
* +-------------------+
* | Waiting on Parent | +------------------+
* +----+ Shard | | Shutdown |
* | | | +--------------------+ Notification |
* | +----------+--------+ | Shutdown: | Requested |
* | | Success | Requested +-+-------+--------+
* | | | | |
* | +------+-------------+ | | | Shutdown:
* | | Initializing +-----+ | | Requested
* | | | | | |
* | | +-----+-------+ | |
* | +---------+----------+ | | Shutdown: | +-----+-------------+
* | | Success | | Terminated | | Shutdown |
* | | | | Zombie | | Notification +-------------+
* | +------+-------------+ | | | | Complete | |
* | | Processing +--+ | | ++-----------+------+ |
* | +---+ | | | | | |
* | | | +----------+ | | | Shutdown: |
* | | +------+-------------+ | \ / | Requested |
* | | | | \/ +--------------------+
* | | | | ||
* | | Success | | || Shutdown:
* | +----------+ | || Terminated
* | | || Zombie
* | | ||
* | | ||
* | | +---++--------------+
* | | | Shutting Down |
* | +-----------+ |
* | | |
* | +--------+----------+
* | |
* | | Shutdown:
* | | All Reasons
* | |
* | |
* | Shutdown: +--------+----------+
* | All Reasons | Shutdown |
* +-------------------------------------------------------+ Complete |
* | |
* +-------------------+
* </pre>
*/
public class KinesisConsumerStates {
/**
* Enumerates processing states when working on a shard.
*/
public enum ShardConsumerState {
// @formatter:off
WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()),
INITIALIZING(new InitializingState()),
PROCESSING(new ProcessingState()),
SHUTDOWN_REQUESTED(new ShutdownNotificationState()),
SHUTTING_DOWN(new ShuttingDownState()),
SHUTDOWN_COMPLETE(new ShutdownCompleteState());
//@formatter:on
private final ConsumerState consumerState;
ShardConsumerState(ConsumerState consumerState) {
this.consumerState = consumerState;
}
public ConsumerState getConsumerState() {
return consumerState;
}
}
/**
* Represents a the current state of the consumer. This handles the creation of tasks for the consumer, and what to
* do when a transition occurs.
*
*/
public interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
* consumer during the execution of this method.
*
* @param consumer
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
*/
ITask createTask(KinesisShardConsumer consumer);
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(KinesisShardConsumer)}.
*
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState successTransition();
/**
* Provides the next state of the consumer when a shutdown has been requested. The returned state is dependent
* on the current state, and the shutdown reason.
*
* @param shutdownReason
* the reason that a shutdown was requested
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
*/
ConsumerState shutdownTransition(ShutdownReason shutdownReason);
/**
* The type of task that {@link ConsumerState#createTask(KinesisShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
*
* @return the type of task that this state represents.
*/
TaskType getTaskType();
/**
* An enumeration represent the type of this state. Different consumer states may return the same
* {@link ShardConsumerState}.
*
* @return the type of consumer state this represents.
*/
ShardConsumerState getState();
boolean isTerminal();
}
/**
* The initial state that any {@link KinesisShardConsumer} should start in.
*/
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
private static ConsumerState shutdownStateFor(ShutdownReason reason) {
switch (reason) {
case REQUESTED:
return ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
case TERMINATE:
case ZOMBIE:
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
default:
throw new IllegalArgumentException("Unknown reason: " + reason);
}
}
/**
* This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
* shards have been completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transition to the initializing state to allow the record processor to be initialized in preparation of
* processing.</dd>
* <dt>Shutdown</dt>
* <dd>
* <dl>
* <dt>All Reasons</dt>
* <dd>Transitions to {@link ShutdownCompleteState}. Since the record processor was never initialized it can't be
* informed of the shutdown.</dd>
* </dl>
* </dd>
* </dl>
*/
static class BlockedOnParentState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(),
consumer.getParentShardPollIntervalMillis());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.INITIALIZING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.BLOCK_ON_PARENT_SHARDS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.WAITING_ON_PARENT_SHARDS;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for initializing the record processor with the shard information.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Transitions to the processing state which will begin to send records to the record processor</dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has been initialized, but hasn't processed any records. This requires that
* the record processor be notified of the shutdown, even though there is almost no actions the record processor
* could take.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>
* <p>
* This reason should not occur, since terminate is triggered after reaching the end of a shard. Initialize never
* makes an requests to Kinesis for records, so it can't reach the end of a shard.
* </p>
* <p>
* Transitions to the {@link ShuttingDownState}
* </p>
* </dd>
* </dl>
* </dd>
* </dl>
*/
public static class InitializingState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(),
consumer.getRecordProcessor(),
consumer.getCheckpoint(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.getStreamConfig(),
consumer.getGetRecordsCache());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.INITIALIZE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.INITIALIZING;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is responsible for retrieving records from Kinesis, and dispatching them to the record processor.
* While in this state the only way a transition will occur is if a shutdown has been triggered.
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Doesn't actually transition, but instead returns the same state</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. It's now possible for the consumer to reach the end
* of the shard triggering a {@link ShutdownReason#TERMINATE}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Transitions to the {@link ShutdownNotificationState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ProcessingState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(),
consumer.getStreamConfig(),
consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getGetRecordsCache());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.PROCESSING.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.PROCESS;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.PROCESSING;
}
@Override
public boolean isTerminal() {
return false;
}
}
static final ConsumerState SHUTDOWN_REQUEST_COMPLETION_STATE = new ShutdownNotificationCompletionState();
/**
* This state occurs when a shutdown has been explicitly requested. This shutdown allows the record processor a
* chance to checkpoint and prepare to be shutdown via the normal method. This state can only be reached by a
* shutdown on the {@link InitializingState} or {@link ProcessingState}.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. An explicit shutdown will allow the record
* processor one last chance to checkpoint, and then the {@link KinesisShardConsumer} will be held in an idle state.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification(),
consumer.getShardInfo());
}
@Override
public ConsumerState successTransition() {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason == ShutdownReason.REQUESTED) {
return SHUTDOWN_REQUEST_COMPLETION_STATE;
}
return shutdownReason.getShutdownState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* Once the {@link ShutdownNotificationState} has been completed the {@link KinesisShardConsumer} must not re-enter any of the
* processing states. This state idles the {@link KinesisShardConsumer} until the worker triggers the final shutdown state.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownNotificationCompletionState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the {@link KinesisShardConsumer} has notified the record processor of the impending shutdown, and is
* waiting that notification. While waiting for the notification no further processing should occur on the
* {@link KinesisShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains
* {@link ShutdownNotificationCompletionState}</dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShuttingDownState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownNotificationCompletionState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
if (shutdownReason != ShutdownReason.REQUESTED) {
return shutdownReason.getShutdownState();
}
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_NOTIFICATION;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_REQUESTED;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This state is entered if the {@link KinesisShardConsumer} loses its lease, or reaches the end of the shard.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Transitions to the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the record processor has processed the final shutdown indication, and depending on the shutdown
* reason taken the correct course of action. From this point on there should be no more interactions with the
* record processor or {@link KinesisShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Transitions to {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Transitions to the {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShuttingDownState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
return new KinesisShutdownTask(consumer.getShardInfo(),
consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(),
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards(),
consumer.getLeaseCleanupManager());
}
@Override
public ConsumerState successTransition() {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTTING_DOWN;
}
@Override
public boolean isTerminal() {
return false;
}
}
/**
* This is the final state for the {@link KinesisShardConsumer}. This occurs once all shutdown activities are completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the all shutdown activites are completed, and the {@link KinesisShardConsumer} should not take any
* further actions.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
* <p>
* This should not occur as all other {@link ShutdownReason}s take priority over it.
* </p>
* <p>
* Remains in {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>{@link ShutdownReason#ZOMBIE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* <dt>{@link ShutdownReason#TERMINATE}</dt>
* <dd>Remains in {@link ShutdownCompleteState}</dd>
* </dl>
* </dd>
* </dl>
*/
static class ShutdownCompleteState implements ConsumerState {
@Override
public ITask createTask(KinesisShardConsumer consumer) {
if (consumer.getShutdownNotification() != null) {
consumer.getShutdownNotification().shutdownComplete();
}
return null;
}
@Override
public ConsumerState successTransition() {
return this;
}
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return this;
}
@Override
public TaskType getTaskType() {
return TaskType.SHUTDOWN_COMPLETE;
}
@Override
public ShardConsumerState getState() {
return ShardConsumerState.SHUTDOWN_COMPLETE;
}
@Override
public boolean isTerminal() {
return true;
}
}
}

View file

@ -0,0 +1,608 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
/**
* Responsible for consuming data records of a (specified) shard.
* The instance should be shutdown when we lose the primary responsibility for a shard.
* A new instance should be created if the primary responsibility is reassigned back to this process.
*/
public class KinesisShardConsumer implements IShardConsumer{
private static final Log LOG = LogFactory.getLog(KinesisShardConsumer.class);
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService;
private final ShardInfo shardInfo;
private final KinesisDataFetcher dataFetcher;
private final IMetricsFactory metricsFactory;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint;
private LeaseCleanupManager leaseCleanupManager;
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
//@Getter
private final ShardSyncer shardSyncer;
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
private ShardSyncStrategy shardSyncStrategy;
//@Getter
private List<ChildShard> childShards;
//@Getter
private final GetRecordsCache getRecordsCache;
public List<ChildShard> getChildShards() {
return childShards;
}
public GetRecordsCache getGetRecordsCache() {
return getRecordsCache;
}
public ShardSyncer getShardSyncer() {
return shardSyncer;
}
private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/*
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates.
*/
private KinesisConsumerStates.ConsumerState currentState = KinesisConsumerStates.INITIAL_STATE;
/*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
*/
private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;
/**
* @param shardInfo Shard information
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo,
streamConfig,
checkpoint,
recordProcessor,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
config, shardSyncer, shardSyncStrategy);
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(
shardInfo,
streamConfig,
checkpoint,
recordProcessor,
new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory),
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config, shardSyncer, shardSyncStrategy
);
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream Config to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
@Deprecated
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream Config to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.leaseCoordinator = leaseCoordinator;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.config = config;
this.dataFetcher = kinesisDataFetcher;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
this.leaseCleanupManager = leaseCleanupManager;
}
/**
* No-op if current task is pending, otherwise submits next task for this shard.
* This method should NOT be called if the ShardConsumer is already in SHUTDOWN_COMPLETED state.
*
* @return true if a new process task was submitted, false otherwise
*/
public synchronized boolean consumeShard() {
return checkAndSubmitNextTask();
}
private boolean readyForNextTask() {
return future == null || future.isCancelled() || future.isDone();
}
private synchronized boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false;
if (readyForNextTask()) {
TaskOutcome taskOutcome = TaskOutcome.NOT_COMPLETE;
if (future != null && future.isDone()) {
taskOutcome = determineTaskOutcome();
}
updateState(taskOutcome);
ITask nextTask = getNextTask();
if (nextTask != null) {
currentTask = nextTask;
try {
future = executorService.submit(currentTask);
currentTaskSubmitTime = System.currentTimeMillis();
submittedNewTask = true;
LOG.debug("Submitted new " + currentTask.getTaskType()
+ " task for shard " + shardInfo.getShardId());
} catch (RejectedExecutionException e) {
LOG.info(currentTask.getTaskType() + " task was not accepted for execution.", e);
} catch (RuntimeException e) {
LOG.info(currentTask.getTaskType() + " task encountered exception ", e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("No new task to submit for shard %s, currentState %s",
shardInfo.getShardId(),
currentState.toString()));
}
}
} else {
final long timeElapsed = System.currentTimeMillis() - currentTaskSubmitTime;
final String commonMessage = String.format("Previous %s task still pending for shard %s since %d ms ago. ",
currentTask.getTaskType(), shardInfo.getShardId(), timeElapsed);
if (LOG.isDebugEnabled()) {
LOG.debug(commonMessage + "Not submitting new task.");
}
config.getLogWarningForTaskAfterMillis().ifPresent(value -> {
if (timeElapsed > value) {
LOG.warn(commonMessage);
}
});
}
return submittedNewTask;
}
public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/*public enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}*/
private TaskOutcome determineTaskOutcome() {
try {
TaskResult result = future.get();
if (result.getException() == null) {
if (result.isShardEndReached()) {
if (!CollectionUtils.isNullOrEmpty(result.getChildShards())) {
childShards = result.getChildShards();
LOG.info("Shard " + shardInfo.getShardId() + ": Setting childShards in ShardConsumer: " + childShards);
}
return TaskOutcome.END_OF_SHARD;
}
return TaskOutcome.SUCCESSFUL;
}
logTaskException(result);
// This is the case of result with exception
if (result.isLeaseNotFound()) {
return TaskOutcome.LEASE_NOT_FOUND;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// Setting future to null so we don't misinterpret task completion status in case of exceptions
future = null;
}
return TaskOutcome.FAILURE;
}
private void logTaskException(TaskResult taskResult) {
if (LOG.isDebugEnabled()) {
Exception taskException = taskResult.getException();
if (taskException instanceof BlockedOnParentShardException) {
// No need to log the stack trace for this exception (it is very specific).
LOG.debug("Shard " + shardInfo.getShardId() + " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
taskResult.getException());
}
}
}
/**
* Requests the shutdown of the this ShardConsumer. This should give the record processor a chance to checkpoint
* before being shutdown.
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
*
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
public synchronized boolean beginShutdown() {
markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask();
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
/**
* Used (by Worker) to check if this ShardConsumer instance has been shutdown
* RecordProcessor shutdown() has been invoked, as appropriate.
*
* @return true if shutdown is complete
*/
public boolean isShutdown() {
return currentState.isTerminal();
}
/**
* @return the shutdownReason
*/
public ShutdownReason getShutdownReason() {
return shutdownReason;
}
/**
* Figure out next task to run based on current state, task, and shutdown context.
*
* @return Return next task to run
*/
private ITask getNextTask() {
ITask nextTask = currentState.createTask(this);
if (nextTask == null) {
return null;
} else {
return new MetricsCollectingTaskDecorator(nextTask, metricsFactory);
}
}
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
* @param taskOutcome The outcome of the last task
*/
void updateState(TaskOutcome taskOutcome) {
if (taskOutcome == TaskOutcome.END_OF_SHARD) {
markForShutdown(ShutdownReason.TERMINATE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE");
}
if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) {
markForShutdown(ShutdownReason.ZOMBIE);
LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found");
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (isShutdownRequested() && KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition();
} else {
LOG.error("Current State task type of '" + currentState.getTaskType()
+ "' doesn't match the current tasks type of '" + currentTask.getTaskType()
+ "'. This shouldn't happen, and indicates a programming error. "
+ "Unable to safely transition to the next state.");
}
}
//
// Don't change state otherwise
//
}
@VisibleForTesting
public boolean isShutdownRequested() {
return shutdownReason != null;
}
/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the currentState
*/
public KinesisConsumerStates.ShardConsumerState getCurrentState() {
return currentState.getState();
}
StreamConfig getStreamConfig() {
return streamConfig;
}
IRecordProcessor getRecordProcessor() {
return recordProcessor;
}
RecordProcessorCheckpointer getRecordProcessorCheckpointer() {
return recordProcessorCheckpointer;
}
ExecutorService getExecutorService() {
return executorService;
}
ShardInfo getShardInfo() {
return shardInfo;
}
KinesisDataFetcher getDataFetcher() {
return dataFetcher;
}
ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseCoordinator.getLeaseManager();
}
KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
return leaseCoordinator;
}
ICheckpoint getCheckpoint() {
return checkpoint;
}
long getParentShardPollIntervalMillis() {
return parentShardPollIntervalMillis;
}
boolean isCleanupLeasesOfCompletedShards() {
return cleanupLeasesOfCompletedShards;
}
boolean isIgnoreUnexpectedChildShards() {
return config.shouldIgnoreUnexpectedChildShards();
}
long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
Future<TaskResult> getFuture() {
return future;
}
ShutdownNotification getShutdownNotification() {
return shutdownNotification;
}
ShardSyncStrategy getShardSyncStrategy() {
return shardSyncStrategy;
}
LeaseCleanupManager getLeaseCleanupManager() {
return leaseCleanupManager;
}
}

View file

@ -0,0 +1,48 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
public class KinesisShardConsumerFactory implements IShardConsumerFactory{
@Override
public IShardConsumer createShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpointTracker,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesUponShardCompletion,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager) {
return new KinesisShardConsumer(shardInfo,
streamConfig,
checkpointTracker,
recordProcessor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config, shardSyncer, shardSyncStrategy,
leaseCleanupManager);
}
}

View file

@ -0,0 +1,337 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Task for invoking the RecordProcessor shutdown() callback.
*/
public class KinesisShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class);
@VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 50;
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownReason reason;
private final IKinesisProxy kinesisProxy;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
private final boolean ignoreUnexpectedChildShards;
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsCache getRecordsCache;
private final ShardSyncer shardSyncer;
private final ShardSyncStrategy shardSyncStrategy;
private final List<ChildShard> childShards;
private final LeaseCleanupManager leaseCleanupManager;
/**
* Constructor.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
KinesisShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.reason = reason;
this.kinesisProxy = kinesisProxy;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.leaseCoordinator = leaseCoordinator;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsCache = getRecordsCache;
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
this.childShards = childShards;
this.leaseCleanupManager = leaseCleanupManager;
}
/*
* Invokes RecordProcessor shutdown() API.
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
public TaskResult call() {
Exception exception;
LOG.info("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken: "
+ shardInfo.getConcurrencyToken() + ", original Shutdown reason: " + reason + ". childShards:" + childShards);
try {
final KinesisClientLease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final Runnable leaseLostAction = () -> takeLeaseLostAction();
if (reason == ShutdownReason.TERMINATE) {
try {
takeShardEndAction(currentShardLease);
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with ZOMBIE reason to allow other worker to take the lease and retry shutting down.
LOG.warn("Lease " + shardInfo.getShardId() + ": Invalid state encountered while shutting down shardConsumer with TERMINATE reason. " +
"Dropping the lease and shutting down shardConsumer using ZOMBIE reason. ", e);
dropLease(currentShardLease);
throwOnApplicationException(leaseLostAction);
}
} else {
throwOnApplicationException(leaseLostAction);
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
return new TaskResult(null);
} catch (Exception e) {
if (e instanceof CustomerApplicationException) {
LOG.error("Shard " + shardInfo.getShardId() + ": Application exception: ", e);
} else {
LOG.error("Shard " + shardInfo.getShardId() + ": Caught exception: ", e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug("Interrupted sleep", ie);
}
}
return new TaskResult(exception);
}
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(KinesisClientLease currentShardLease)
throws InvalidStateException, DependencyException, ProvisionedThroughputException, CustomerApplicationException {
// Create new lease for the child shards if they don't exist.
// We have one valid scenario that shutdown task got created with SHARD_END reason and an empty list of childShards.
// This would happen when KinesisDataFetcher catches ResourceNotFound exception.
// In this case, KinesisDataFetcher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) {
throw new InvalidStateException("Shard " + shardInfo.getShardId() + ": Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
}
if (!CollectionUtils.isNullOrEmpty(childShards)) {
// If childShards is not empty, create new leases for the childShards and update the current lease with the childShards lease information.
createLeasesForChildShardsIfNotExist();
updateCurrentLeaseWithChildShards(currentShardLease);
} else {
LOG.warn("Shard " + shardInfo.getShardId()
+ ": Shutting down consumer with SHARD_END reason without creating leases for child shards.");
}
// Checkpoint with SHARD_END sequence number.
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentShardLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false;
try {
isSuccess = attemptShardEndCheckpointing();
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (isSuccess || CollectionUtils.isNullOrEmpty(childShards)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
}
}
}
private void takeLeaseLostAction() {
final ShutdownInput leaseLostShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.ZOMBIE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(leaseLostShutdownInput);
}
private boolean attemptShardEndCheckpointing()
throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException {
final KinesisClientLease leaseFromDdb = Optional.ofNullable(leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId()))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + shardInfo.getShardId() + " does not exist."));
if (!leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the recordProcessor to checkpoint with SHARD_END sequence number.
// The recordProcessor.shutdown is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling recordProcessor.shutdown.
throwOnApplicationException(() -> applicationCheckpointAndVerification());
}
return true;
}
private void applicationCheckpointAndVerification() {
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
final ShutdownInput shardEndShutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(shardEndShutdownInput);
boolean successfullyCheckpointedShardEnd = false;
KinesisClientLease leaseFromDdb = null;
try {
leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId());
} catch (Exception e) {
LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e);
}
if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) {
successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) {
LOG.error("Shard " + shardInfo.getShardId() +
" : Checkpoint information mismatch between authoritative source and local cache. " +
"This does not affect the application flow, but cut a ticket to Kinesis when you see this. " +
"Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue);
}
} else {
LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb);
}
if (!successfullyCheckpointedShardEnd) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
}
}
private void throwOnApplicationException(Runnable action) throws CustomerApplicationException {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + shardInfo.getShardId(), e);
}
}
private void createLeasesForChildShardsIfNotExist() throws InvalidStateException, DependencyException, ProvisionedThroughputException {
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
// not present in the lease table before creating the lease entry.
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
final ChildShard childShard = childShards.get(0);
final List<String> parentLeaseKeys = childShard.getParentShards();
if (parentLeaseKeys.size() != 2) {
throw new InvalidStateException("Shard " + shardInfo.getShardId()+ "'s only child shard " + childShard
+ " does not contain other parent information.");
} else {
boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) ==
Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1)));
if (!isValidLeaseTableState) {
if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
throw new BlockedOnParentShardException(
"Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
} else {
throw new InvalidStateException("Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table.");
}
}
}
}
// Attempt create leases for child shards.
for (ChildShard childShard : childShards) {
final String leaseKey = childShard.getShardId();
if (leaseCoordinator.getLeaseManager().getLease(leaseKey) == null) {
final KinesisClientLease leaseToCreate = KinesisShardSyncer.newKCLLeaseForChildShard(childShard);
leaseCoordinator.getLeaseManager().createLeaseIfNotExists(leaseToCreate);
LOG.info("Shard " + shardInfo.getShardId() + " : Created child shard lease: " + leaseToCreate.getLeaseKey());
}
}
}
/**
* Returns true for 1 in N probability.
*/
@VisibleForTesting
boolean isOneInNProbability(int n) {
Random r = new Random();
return 1 == r.nextInt((n - 1) + 1) + 1;
}
private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
currentLease.setChildShardIds(childShardIds);
leaseCoordinator.getLeaseManager().updateLeaseWithMetaInfo(currentLease, UpdateField.CHILD_SHARDS);
LOG.info("Shard " + shardInfo.getShardId() + ": Updated current lease with child shard information: " + currentLease.getLeaseKey());
}
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
*/
@Override
public TaskType getTaskType() {
return taskType;
}
@VisibleForTesting
ShutdownReason getReason() {
return reason;
}
private void dropLease(KinesisClientLease currentShardLease) {
if (currentShardLease == null) {
LOG.warn("Shard " + shardInfo.getShardId() + ": Unable to find the lease for shard. Will shutdown the shardConsumer directly.");
return;
}
leaseCoordinator.dropLease(currentShardLease);
LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentShardLease.getLeaseKey());
}
}

View file

@ -0,0 +1,410 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange;
/**
* The top level orchestrator for coordinating the periodic shard sync related activities. If the configured
* {@link ShardSyncStrategyType} is PERIODIC, this class will be the main shard sync orchestrator. For non-PERIODIC
* strategies, this class will serve as an internal auditor that periodically checks if the full hash range is covered
* by currently held leases, and initiates a recovery shard sync if not.
*/
@Getter
@EqualsAndHashCode
class PeriodicShardSyncManager {
private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class);
private static final long INITIAL_DELAY = 0;
/** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */
private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L;
/** Parameters for validating hash range completeness when running in auditor mode. */
@VisibleForTesting
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
@VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
private final HashRangeHoleTracker hashRangeHoleTracker = new HashRangeHoleTracker();
private final String workerId;
private final LeaderDecider leaderDecider;
private final ITask metricsEmittingShardSyncTask;
private final ScheduledExecutorService shardSyncThreadPool;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final IKinesisProxy kinesisProxy;
private final boolean isAuditorMode;
private final long periodicShardSyncIntervalMillis;
private boolean isRunning;
private final IMetricsFactory metricsFactory;
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
PeriodicShardSyncManager(String workerId,
LeaderDecider leaderDecider,
ShardSyncTask shardSyncTask,
IMetricsFactory metricsFactory,
ILeaseManager<KinesisClientLease> leaseManager,
IKinesisProxy kinesisProxy,
boolean isAuditorMode,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory,
leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis,
leasesRecoveryAuditorInconsistencyConfidenceThreshold);
}
PeriodicShardSyncManager(String workerId,
LeaderDecider leaderDecider,
ShardSyncTask shardSyncTask,
ScheduledExecutorService shardSyncThreadPool,
IMetricsFactory metricsFactory,
ILeaseManager<KinesisClientLease> leaseManager,
IKinesisProxy kinesisProxy,
boolean isAuditorMode,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
this.leaderDecider = leaderDecider;
this.metricsEmittingShardSyncTask = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory);
this.shardSyncThreadPool = shardSyncThreadPool;
this.leaseManager = leaseManager;
this.kinesisProxy = kinesisProxy;
this.metricsFactory = metricsFactory;
this.isAuditorMode = isAuditorMode;
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
if (isAuditorMode) {
Validate.notNull(this.leaseManager, "LeaseManager is required for non-PERIODIC shard sync strategies.");
Validate.notNull(this.kinesisProxy, "KinesisProxy is required for non-PERIODIC shard sync strategies.");
this.periodicShardSyncIntervalMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
} else {
this.periodicShardSyncIntervalMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;
}
}
public synchronized TaskResult start() {
if (!isRunning) {
final Runnable periodicShardSyncer = () -> {
try {
runShardSync();
} catch (Throwable t) {
LOG.error("Error running shard sync.", t);
}
};
shardSyncThreadPool
.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, periodicShardSyncIntervalMillis,
TimeUnit.MILLISECONDS);
isRunning = true;
}
return new TaskResult(null);
}
/**
* Runs ShardSync once, without scheduling further periodic ShardSyncs.
* @return TaskResult from shard sync
*/
public synchronized TaskResult syncShardsOnce() {
LOG.info("Syncing shards once from worker " + workerId);
return metricsEmittingShardSyncTask.call();
}
public void stop() {
if (isRunning) {
LOG.info(String.format("Shutting down leader decider on worker %s", workerId));
leaderDecider.shutdown();
LOG.info(String.format("Shutting down periodic shard sync task scheduler on worker %s", workerId));
shardSyncThreadPool.shutdown();
isRunning = false;
}
}
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
LOG.debug("WorkerId " + workerId + " is a leader, running the shard sync task");
MetricsHelper.startScope(metricsFactory, PERIODIC_SHARD_SYNC_MANAGER);
boolean isRunSuccess = false;
final long runStartMillis = System.currentTimeMillis();
try {
final ShardSyncResponse shardSyncResponse = checkForShardSync();
MetricsHelper.getMetricsScope().addData("NumStreamsToSync", shardSyncResponse.shouldDoShardSync() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
MetricsHelper.getMetricsScope().addData("NumStreamsWithPartialLeases", shardSyncResponse.isHoleDetected() ? 1 : 0, StandardUnit.Count, MetricsLevel.SUMMARY);
if (shardSyncResponse.shouldDoShardSync()) {
LOG.info("Periodic shard syncer initiating shard sync due to the reason - " +
shardSyncResponse.reasonForDecision());
metricsEmittingShardSyncTask.call();
} else {
LOG.info("Skipping shard sync due to the reason - " + shardSyncResponse.reasonForDecision());
}
isRunSuccess = true;
} catch (Exception e) {
LOG.error("Caught exception while running periodic shard syncer.", e);
} finally {
MetricsHelper.addSuccessAndLatency(runStartMillis, isRunSuccess, MetricsLevel.SUMMARY);
MetricsHelper.endScope();
}
} else {
LOG.debug("WorkerId " + workerId + " is not a leader, not running the shard sync task");
}
}
@VisibleForTesting
ShardSyncResponse checkForShardSync() throws DependencyException, InvalidStateException,
ProvisionedThroughputException {
if (!isAuditorMode) {
// If we are running with PERIODIC shard sync strategy, we should sync every time.
return new ShardSyncResponse(true, false, "Syncing every time with PERIODIC shard sync strategy.");
}
// Get current leases from DynamoDB.
final List<KinesisClientLease> currentLeases = leaseManager.listLeases();
if (CollectionUtils.isNullOrEmpty(currentLeases)) {
// If the current leases are null or empty, then we need to initiate a shard sync.
LOG.info("No leases found. Will trigger a shard sync.");
return new ShardSyncResponse(true, false, "No leases found.");
}
// Check if there are any holes in the hash range covered by current leases. Return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(currentLeases);
if (hashRangeHoleOpt.isPresent()) {
// If hole is present, check if the hole is detected consecutively in previous occurrences. If hole is
// determined with high confidence, return true; return false otherwise. We use the high confidence factor
// to avoid shard sync on any holes during resharding and lease cleanups, or other intermittent issues.
final boolean hasHoleWithHighConfidence =
hashRangeHoleTracker.hashHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence, true,
"Detected the same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles() + " times. " +
"Will initiate shard sync after reaching threshold: " + leasesRecoveryAuditorInconsistencyConfidenceThreshold);
} else {
// If hole is not present, clear any previous hole tracking and return false.
hashRangeHoleTracker.reset();
return new ShardSyncResponse(false, false, "Hash range is complete.");
}
}
@VisibleForTesting
Optional<HashRangeHole> hasHoleInLeases(List<KinesisClientLease> leases) {
// Filter out any leases with checkpoints other than SHARD_END
final List<KinesisClientLease> activeLeases = leases.stream()
.filter(lease -> lease.getCheckpoint() != null && !lease.getCheckpoint().isShardEnd())
.collect(Collectors.toList());
final List<KinesisClientLease> activeLeasesWithHashRanges = fillWithHashRangesIfRequired(activeLeases);
return checkForHoleInHashKeyRanges(activeLeasesWithHashRanges);
}
private List<KinesisClientLease> fillWithHashRangesIfRequired(List<KinesisClientLease> activeLeases) {
final List<KinesisClientLease> activeLeasesWithNoHashRanges = activeLeases.stream()
.filter(lease -> lease.getHashKeyRange() == null).collect(Collectors.toList());
if (activeLeasesWithNoHashRanges.isEmpty()) {
return activeLeases;
}
// Fetch shards from Kinesis to fill in the in-memory hash ranges
final Map<String, Shard> kinesisShards = kinesisProxy.getShardList().stream()
.collect(Collectors.toMap(Shard::getShardId, shard -> shard));
return activeLeases.stream().map(lease -> {
if (lease.getHashKeyRange() == null) {
final String shardId = lease.getLeaseKey();
final Shard shard = kinesisShards.get(shardId);
if (shard == null) {
return lease;
}
lease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange()));
try {
leaseManager.updateLeaseWithMetaInfo(lease, UpdateField.HASH_KEY_RANGE);
} catch (Exception e) {
LOG.warn("Unable to update hash range information for lease " + lease.getLeaseKey() +
". This may result in explicit lease sync.");
}
}
return lease;
}).filter(lease -> lease.getHashKeyRange() != null).collect(Collectors.toList());
}
@VisibleForTesting
static Optional<HashRangeHole> checkForHoleInHashKeyRanges(List<KinesisClientLease> leasesWithHashKeyRanges) {
// Sort the hash ranges by starting hash key
final List<KinesisClientLease> sortedLeasesWithHashKeyRanges = sortLeasesByHashRange(leasesWithHashKeyRanges);
if (sortedLeasesWithHashKeyRanges.isEmpty()) {
LOG.error("No leases with valid hash ranges found.");
return Optional.of(new HashRangeHole());
}
// Validate the hash range bounds
final KinesisClientLease minHashKeyLease = sortedLeasesWithHashKeyRanges.get(0);
final KinesisClientLease maxHashKeyLease =
sortedLeasesWithHashKeyRanges.get(sortedLeasesWithHashKeyRanges.size() - 1);
if (!minHashKeyLease.getHashKeyRange().startingHashKey().equals(MIN_HASH_KEY) ||
!maxHashKeyLease.getHashKeyRange().endingHashKey().equals(MAX_HASH_KEY)) {
LOG.error("Incomplete hash range found between " + minHashKeyLease + " and " + maxHashKeyLease);
return Optional.of(new HashRangeHole(minHashKeyLease.getHashKeyRange(), maxHashKeyLease.getHashKeyRange()));
}
// Check for any holes in the sorted hash range intervals
if (sortedLeasesWithHashKeyRanges.size() > 1) {
KinesisClientLease leftmostLeaseToReportInCaseOfHole = minHashKeyLease;
HashKeyRangeForLease leftLeaseHashRange = leftmostLeaseToReportInCaseOfHole.getHashKeyRange();
for (int i = 1; i < sortedLeasesWithHashKeyRanges.size(); i++) {
final KinesisClientLease rightLease = sortedLeasesWithHashKeyRanges.get(i);
final HashKeyRangeForLease rightLeaseHashRange = rightLease.getHashKeyRange();
final BigInteger rangeDiff =
rightLeaseHashRange.startingHashKey().subtract(leftLeaseHashRange.endingHashKey());
// We have overlapping leases when rangeDiff is 0 or negative.
// signum() will be -1 for negative and 0 if value is 0.
// Merge the ranges for further tracking.
if (rangeDiff.signum() <= 0) {
leftLeaseHashRange = new HashKeyRangeForLease(leftLeaseHashRange.startingHashKey(),
leftLeaseHashRange.endingHashKey().max(rightLeaseHashRange.endingHashKey()));
} else {
// We have non-overlapping leases when rangeDiff is positive. signum() will be 1 in this case.
// If rangeDiff is 1, then it is a continuous hash range. If not, there is a hole.
if (!rangeDiff.equals(BigInteger.ONE)) {
LOG.error("Incomplete hash range found between " + leftmostLeaseToReportInCaseOfHole +
" and " + rightLease);
return Optional.of(new HashRangeHole(leftmostLeaseToReportInCaseOfHole.getHashKeyRange(),
rightLease.getHashKeyRange()));
}
leftmostLeaseToReportInCaseOfHole = rightLease;
leftLeaseHashRange = rightLeaseHashRange;
}
}
}
return Optional.empty();
}
@VisibleForTesting
static List<KinesisClientLease> sortLeasesByHashRange(List<KinesisClientLease> leasesWithHashKeyRanges) {
if (leasesWithHashKeyRanges.size() == 0 || leasesWithHashKeyRanges.size() == 1) {
return leasesWithHashKeyRanges;
}
Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator());
return leasesWithHashKeyRanges;
}
@Value
@Accessors(fluent = true)
@VisibleForTesting
static class ShardSyncResponse {
private final boolean shouldDoShardSync;
private final boolean isHoleDetected;
private final String reasonForDecision;
}
@Value
private static class HashRangeHole {
private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole;
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
HashRangeHole() {
hashRangeAtStartOfPossibleHole = hashRangeAtEndOfPossibleHole = null;
}
HashRangeHole(HashKeyRangeForLease hashRangeAtStartOfPossibleHole,
HashKeyRangeForLease hashRangeAtEndOfPossibleHole) {
this.hashRangeAtStartOfPossibleHole = hashRangeAtStartOfPossibleHole;
this.hashRangeAtEndOfPossibleHole = hashRangeAtEndOfPossibleHole;
}
}
private class HashRangeHoleTracker {
private HashRangeHole hashRangeHole;
@Getter
private Integer numConsecutiveHoles;
public boolean hashHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole) {
if (hashRangeHole.equals(this.hashRangeHole)) {
++this.numConsecutiveHoles;
} else {
this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1;
}
return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold;
}
public void reset() {
this.hashRangeHole = null;
this.numConsecutiveHoles = 0;
}
}
private static class HashKeyRangeComparator implements Comparator<KinesisClientLease>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(KinesisClientLease lease, KinesisClientLease otherLease) {
Validate.notNull(lease);
Validate.notNull(otherLease);
Validate.notNull(lease.getHashKeyRange());
Validate.notNull(otherLease.getHashKeyRange());
return ComparisonChain.start()
.compare(lease.getHashKeyRange().startingHashKey(), otherLease.getHashKeyRange().startingHashKey())
.compare(lease.getHashKeyRange().endingHashKey(), otherLease.getHashKeyRange().endingHashKey())
.result();
}
}
}

View file

@ -160,6 +160,9 @@ public class Worker implements Runnable {
private final LeaseCleanupManager leaseCleanupManager; private final LeaseCleanupManager leaseCleanupManager;
// Shard Consumer Factory
private IShardConsumerFactory shardConsumerFactory;
/** /**
* Constructor. * Constructor.
* *
@ -539,7 +542,7 @@ public class Worker implements Runnable {
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider, periodicShardSyncManager); leaderDecider, periodicShardSyncManager, null /*Ishardconsumer*/);
} }
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
@ -550,7 +553,7 @@ public class Worker implements Runnable {
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
PeriodicShardSyncManager periodicShardSyncManager) { PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config; this.config = config;
@ -580,6 +583,7 @@ public class Worker implements Runnable {
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
this.shardConsumerFactory = shardConsumerFactory;
} }
/** /**
@ -695,8 +699,10 @@ public class Worker implements Runnable {
} }
assignedShards.add(shardInfo); assignedShards.add(shardInfo);
} }
// clean up shard consumers for unassigned shards // clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards); cleanupShardConsumers(assignedShards);
wlog.info("Sleeping ..."); wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
} catch (Exception e) { } catch (Exception e) {
@ -1132,8 +1138,9 @@ public class Worker implements Runnable {
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory); metricsFactory);
if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null if(shardConsumerFactory == null){
this.shardConsumerFactory = new KinesisShardConsumerFactory();
shardConsumerFactory = new KinesisShardConsumerFactory();
} }
return shardConsumerFactory.createShardConsumer(shardInfo, return shardConsumerFactory.createShardConsumer(shardInfo,
@ -1225,7 +1232,7 @@ public class Worker implements Runnable {
* KinesisClientLibConfiguration * KinesisClientLibConfiguration
* @return Returns metrics factory based on the config. * @return Returns metrics factory based on the config.
*/ */
private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
if (config.getMetricsLevel() == MetricsLevel.NONE) { if (config.getMetricsLevel() == MetricsLevel.NONE) {
@ -1354,6 +1361,8 @@ public class Worker implements Runnable {
@Setter @Accessors(fluent = true) @Setter @Accessors(fluent = true)
private IKinesisProxy kinesisProxy; private IKinesisProxy kinesisProxy;
@Setter @Accessors(fluent = true) @Setter @Accessors(fluent = true)
private IShardConsumerFactory shardConsumerFactory;
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener; private WorkerStateChangeListener workerStateChangeListener;
@Setter @Accessors(fluent = true) @Setter @Accessors(fluent = true)
private LeaseCleanupValidator leaseCleanupValidator; private LeaseCleanupValidator leaseCleanupValidator;
@ -1422,6 +1431,10 @@ public class Worker implements Runnable {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Kinesis Client Library configuration needs to be provided to build Worker"); "Kinesis Client Library configuration needs to be provided to build Worker");
} }
if(shardConsumerFactory == null){
shardConsumerFactory = new KinesisShardConsumerFactory();
}
if (recordProcessorFactory == null) { if (recordProcessorFactory == null) {
throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
} }
@ -1513,7 +1526,6 @@ public class Worker implements Runnable {
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
} }
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
config, config,
@ -1547,7 +1559,8 @@ public class Worker implements Runnable {
workerStateChangeListener, workerStateChangeListener,
shardSyncer, shardSyncer,
leaderDecider, leaderDecider,
null /* PeriodicShardSyncManager */); null /*PeriodicShardSyncManager*/,
shardConsumerFactory);
} }
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder, <R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -39,8 +39,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MAX_HASH_KEY; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MIN_HASH_KEY; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY;
import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -52,10 +52,10 @@ public class PeriodicShardSyncManagerTest {
public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3;
/** Manager for PERIODIC shard sync strategy */ /** Manager for PERIODIC shard sync strategy */
private KinesisPeriodicShardSyncManager periodicShardSyncManager; private PeriodicShardSyncManager periodicShardSyncManager;
/** Manager for SHARD_END shard sync strategy */ /** Manager for SHARD_END shard sync strategy */
private KinesisPeriodicShardSyncManager auditorPeriodicShardSyncManager; private PeriodicShardSyncManager auditorPeriodicShardSyncManager;
@Mock @Mock
private LeaderDecider leaderDecider; private LeaderDecider leaderDecider;
@ -70,10 +70,10 @@ public class PeriodicShardSyncManagerTest {
@Before @Before
public void setup() { public void setup() {
periodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS,
LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD);
auditorPeriodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask,
metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS,
LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD);
} }
@ -92,7 +92,7 @@ public class PeriodicShardSyncManagerTest {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
Assert.assertTrue(KinesisPeriodicShardSyncManager Assert.assertTrue(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent()); .checkForHoleInHashKeyRanges(hashRanges).isPresent());
} }
@ -110,7 +110,7 @@ public class PeriodicShardSyncManagerTest {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
Assert.assertFalse(KinesisPeriodicShardSyncManager Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent()); .checkForHoleInHashKeyRanges(hashRanges).isPresent());
} }
@ -128,7 +128,7 @@ public class PeriodicShardSyncManagerTest {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
Assert.assertFalse(KinesisPeriodicShardSyncManager Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent()); .checkForHoleInHashKeyRanges(hashRanges).isPresent());
} }
@ -147,7 +147,7 @@ public class PeriodicShardSyncManagerTest {
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
return lease; return lease;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
Assert.assertFalse(KinesisPeriodicShardSyncManager Assert.assertFalse(PeriodicShardSyncManager
.checkForHoleInHashKeyRanges(hashRanges).isPresent()); .checkForHoleInHashKeyRanges(hashRanges).isPresent());
} }