Every other change for DynamoDBStreamsKinesis Adapter Compatibility (#995)

Co-authored-by: Nicholas Gutierrez <nichgu@amazon.com>
This commit is contained in:
gguptp 2022-10-06 22:38:50 +05:30 committed by GitHub
parent 251b331a2e
commit 372f98b21a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 408 additions and 273 deletions

View file

@ -45,24 +45,24 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private static final int TIME_TO_KEEP_ALIVE = 5;
private static final int CORE_THREAD_POOL_COUNT = 1;
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final IDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
this(dataFetcher, buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId);
}
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
public AsynchronousGetRecordsRetrievalStrategy(final IDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId);
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
AsynchronousGetRecordsRetrievalStrategy(IDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
@ -148,7 +148,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
@Override
public KinesisDataFetcher getDataFetcher() {
public IDataFetcher getDataFetcher() {
return dataFetcher;
}
}

View file

@ -30,7 +30,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
* If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly
* proceed with processing data from the shard.
*/
class BlockOnParentShardTask implements ITask {
public class BlockOnParentShardTask implements ITask {
private static final Log LOG = LogFactory.getLog(BlockOnParentShardTask.class);
private final ShardInfo shardInfo;
@ -45,7 +45,7 @@ class BlockOnParentShardTask implements ITask {
* @param leaseManager Used to fetch the lease and checkpoint info for parent shards
* @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing
*/
BlockOnParentShardTask(ShardInfo shardInfo,
public BlockOnParentShardTask(ShardInfo shardInfo,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis) {
this.shardInfo = shardInfo;

View file

@ -46,9 +46,9 @@ public interface GetRecordsRetrievalStrategy {
boolean isShutdown();
/**
* Returns the KinesisDataFetcher used to getRecords from Kinesis.
* Returns the IDataFetcher used to getRecords
*
* @return KinesisDataFetcher
* @return IDataFetcher
*/
KinesisDataFetcher getDataFetcher();
IDataFetcher getDataFetcher();
}

View file

@ -0,0 +1,23 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.ChildShard;
import java.util.List;
public interface IDataFetcher {
DataFetcherResult getRecords(int maxRecords);
void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);
void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
void restartIterator();
boolean isShardEndReached();
List<ChildShard> getChildShards();
}

View file

@ -0,0 +1,25 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
public interface IShardConsumer {
boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist();
enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}
boolean consumeShard();
boolean isShutdown();
ShutdownReason getShutdownReason();
boolean beginShutdown();
void notifyShutdownRequested(ShutdownNotification shutdownNotification);
KinesisConsumerStates.ShardConsumerState getCurrentState();
boolean isShutdownRequested();
}

View file

@ -0,0 +1,34 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
public interface IShardConsumerFactory {
/**
* Returns a shard consumer to be used for consuming a (assigned) shard.
*
* @return Returns a shard consumer object.
*/
IShardConsumer createShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpointTracker,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesUponShardCompletion,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager);
}

View file

@ -29,7 +29,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
/**
* Task for initializing shard position and invoking the RecordProcessor initialize() API.
*/
class InitializeTask implements ITask {
public class InitializeTask implements ITask {
private static final Log LOG = LogFactory.getLog(InitializeTask.class);
@ -37,7 +37,7 @@ class InitializeTask implements ITask {
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final TaskType taskType = TaskType.INITIALIZE;
private final ICheckpoint checkpoint;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
@ -49,11 +49,11 @@ class InitializeTask implements ITask {
/**
* Constructor.
*/
InitializeTask(ShardInfo shardInfo,
public InitializeTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
IDataFetcher dataFetcher,
long backoffTimeMillis,
StreamConfig streamConfig,
GetRecordsCache getRecordsCache) {

View file

@ -61,7 +61,7 @@ public class KinesisClientLibConfiguration {
public static final int DEFAULT_MAX_RECORDS = 10000;
/**
* The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to
* The default value for how long the {@link KinesisShardConsumer} should sleep if no records are returned from the call to
* {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;
@ -91,7 +91,7 @@ public class KinesisClientLibConfiguration {
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
/**
* Interval to run lease cleanup thread in {@link LeaseCleanupManager}.
* Interval to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}.
*/
private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
@ -1030,7 +1030,7 @@ public class KinesisClientLibConfiguration {
* Keeping it protected to forbid outside callers from depending on this internal object.
* @return The initialPositionInStreamExtended object.
*/
protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
public InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
return initialPositionInStreamExtended;
}

View file

@ -51,7 +51,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
/**
* This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints.
*/
class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class);
@ -368,7 +368,7 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLea
*
* @return LeaseManager
*/
ILeaseManager<KinesisClientLease> getLeaseManager() {
public ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
}

View file

@ -15,7 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
* Top level container for all the possible states a {@link KinesisShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects.
*
* <h2>State Diagram</h2>
@ -64,12 +64,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
* +-------------------+
* </pre>
*/
class ConsumerStates {
public class KinesisConsumerStates {
/**
* Enumerates processing states when working on a shard.
*/
enum ShardConsumerState {
public enum ShardConsumerState {
// @formatter:off
WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()),
INITIALIZING(new InitializingState()),
@ -96,7 +96,7 @@ class ConsumerStates {
* do when a transition occurs.
*
*/
interface ConsumerState {
public interface ConsumerState {
/**
* Creates a new task for this state using the passed in consumer to build the task. If there is no task
* required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the
@ -106,11 +106,11 @@ class ConsumerStates {
* the consumer to use build the task, or execute state.
* @return a valid task for this state or null if there is no task required.
*/
ITask createTask(ShardConsumer consumer);
ITask createTask(KinesisShardConsumer consumer);
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumer)}.
* {@link ConsumerState#createTask(KinesisShardConsumer)}.
*
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.
@ -129,7 +129,7 @@ class ConsumerStates {
ConsumerState shutdownTransition(ShutdownReason shutdownReason);
/**
* The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state
* The type of task that {@link ConsumerState#createTask(KinesisShardConsumer)} would return. This is always a valid state
* even if createTask would return a null value.
*
* @return the type of task that this state represents.
@ -149,7 +149,7 @@ class ConsumerStates {
}
/**
* The initial state that any {@link ShardConsumer} should start in.
* The initial state that any {@link KinesisShardConsumer} should start in.
*/
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState();
@ -187,7 +187,7 @@ class ConsumerStates {
static class BlockedOnParentState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(),
consumer.getParentShardPollIntervalMillis());
}
@ -247,10 +247,10 @@ class ConsumerStates {
* </dd>
* </dl>
*/
static class InitializingState implements ConsumerState {
public static class InitializingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(),
consumer.getRecordProcessor(),
consumer.getCheckpoint(),
@ -311,7 +311,7 @@ class ConsumerStates {
static class ProcessingState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(),
consumer.getStreamConfig(),
consumer.getRecordProcessor(),
@ -358,10 +358,10 @@ class ConsumerStates {
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.</dd>
* <dd>Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.</dd>
* <dt>Shutdown</dt>
* <dd>At this point records are being retrieved, and processed. An explicit shutdown will allow the record
* processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state.
* processor one last chance to checkpoint, and then the {@link KinesisShardConsumer} will be held in an idle state.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to
@ -377,7 +377,7 @@ class ConsumerStates {
static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification(),
@ -414,24 +414,24 @@ class ConsumerStates {
}
/**
* Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the
* processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state.
* Once the {@link ShutdownNotificationState} has been completed the {@link KinesisShardConsumer} must not re-enter any of the
* processing states. This state idles the {@link KinesisShardConsumer} until the worker triggers the final shutdown state.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownNotificationCompletionState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is
* <dd>At this point the {@link KinesisShardConsumer} has notified the record processor of the impending shutdown, and is
* waiting that notification. While waiting for the notification no further processing should occur on the
* {@link ShardConsumer}.
* {@link KinesisShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains
@ -447,7 +447,7 @@ class ConsumerStates {
static class ShutdownNotificationCompletionState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
return null;
}
@ -481,14 +481,14 @@ class ConsumerStates {
}
/**
* This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard.
* This state is entered if the {@link KinesisShardConsumer} loses its lease, or reaches the end of the shard.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Transitions to the {@link ShutdownCompleteState}
@ -497,7 +497,7 @@ class ConsumerStates {
* <dt>Shutdown</dt>
* <dd>At this point the record processor has processed the final shutdown indication, and depending on the shutdown
* reason taken the correct course of action. From this point on there should be no more interactions with the
* record processor or {@link ShardConsumer}.
* record processor or {@link KinesisShardConsumer}.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
* <dd>
@ -519,8 +519,8 @@ class ConsumerStates {
static class ShuttingDownState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownTask(consumer.getShardInfo(),
public ITask createTask(KinesisShardConsumer consumer) {
return new KinesisShutdownTask(consumer.getShardInfo(),
consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownReason(),
@ -562,21 +562,21 @@ class ConsumerStates {
}
/**
* This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed.
* This is the final state for the {@link KinesisShardConsumer}. This occurs once all shutdown activities are completed.
*
* <h2>Valid Transitions</h2>
* <dl>
* <dt>Success</dt>
* <dd>
* <p>
* Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
* Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
* </p>
* <p>
* Remains in the {@link ShutdownCompleteState}
* </p>
* </dd>
* <dt>Shutdown</dt>
* <dd>At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any
* <dd>At this point the all shutdown activites are completed, and the {@link KinesisShardConsumer} should not take any
* further actions.
* <dl>
* <dt>{@link ShutdownReason#REQUESTED}</dt>
@ -599,7 +599,7 @@ class ConsumerStates {
static class ShutdownCompleteState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
public ITask createTask(KinesisShardConsumer consumer) {
if (consumer.getShutdownNotification() != null) {
consumer.getShutdownNotification().shutdownComplete();
}

View file

@ -39,7 +39,7 @@ import lombok.Data;
/**
* Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/
class KinesisDataFetcher {
public class KinesisDataFetcher implements IDataFetcher{
private static final Log LOG = LogFactory.getLog(KinesisDataFetcher.class);
@ -185,7 +185,7 @@ class KinesisDataFetcher {
* @param sequenceNumber advance the iterator to the record at this sequence number.
* @param initialPositionInStream The initialPositionInStream.
*/
void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
} else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) {
@ -276,11 +276,11 @@ class KinesisDataFetcher {
/**
* @return the shardEndReached
*/
protected boolean isShardEndReached() {
public boolean isShardEndReached() {
return isShardEndReached;
}
protected List<ChildShard> getChildShards() {
public List<ChildShard> getChildShards() {
return childShards;
}
@ -290,5 +290,4 @@ class KinesisDataFetcher {
String getNextIterator() {
return nextIterator;
}
}

View file

@ -14,7 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@ -35,7 +34,6 @@ import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
/**
@ -43,9 +41,9 @@ import lombok.Getter;
* The instance should be shutdown when we lose the primary responsibility for a shard.
* A new instance should be created if the primary responsibility is reassigned back to this process.
*/
class ShardConsumer {
public class KinesisShardConsumer implements IShardConsumer{
private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
private static final Log LOG = LogFactory.getLog(KinesisShardConsumer.class);
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
@ -78,7 +76,7 @@ class ShardConsumer {
@Getter
private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
@ -93,7 +91,7 @@ class ShardConsumer {
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
* much coordination/synchronization to handle concurrent reads/updates.
*/
private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE;
private KinesisConsumerStates.ConsumerState currentState = KinesisConsumerStates.INITIAL_STATE;
/*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
@ -116,7 +114,7 @@ class ShardConsumer {
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
ShardConsumer(ShardInfo shardInfo,
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
@ -162,7 +160,7 @@ class ShardConsumer {
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
ShardConsumer(ShardInfo shardInfo,
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
@ -223,7 +221,7 @@ class ShardConsumer {
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
@Deprecated
ShardConsumer(ShardInfo shardInfo,
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
@ -269,7 +267,7 @@ class ShardConsumer {
* @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/
ShardConsumer(ShardInfo shardInfo,
KinesisShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
@ -314,7 +312,7 @@ class ShardConsumer {
*
* @return true if a new process task was submitted, false otherwise
*/
synchronized boolean consumeShard() {
public synchronized boolean consumeShard() {
return checkAndSubmitNextTask();
}
@ -373,10 +371,6 @@ class ShardConsumer {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
private enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}
private TaskOutcome determineTaskOutcome() {
try {
TaskResult result = future.get();
@ -423,7 +417,7 @@ class ShardConsumer {
*
* @param shutdownNotification used to signal that the record processor has been given the chance to shutdown.
*/
void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
public void notifyShutdownRequested(ShutdownNotification shutdownNotification) {
this.shutdownNotification = shutdownNotification;
markForShutdown(ShutdownReason.REQUESTED);
}
@ -434,7 +428,7 @@ class ShardConsumer {
*
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
public synchronized boolean beginShutdown() {
markForShutdown(ShutdownReason.ZOMBIE);
checkAndSubmitNextTask();
@ -454,14 +448,14 @@ class ShardConsumer {
*
* @return true if shutdown is complete
*/
boolean isShutdown() {
public boolean isShutdown() {
return currentState.isTerminal();
}
/**
* @return the shutdownReason
*/
ShutdownReason getShutdownReason() {
public ShutdownReason getShutdownReason() {
return shutdownReason;
}
@ -497,7 +491,7 @@ class ShardConsumer {
}
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
} else if (isShutdownRequested() && KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) {
@ -516,7 +510,7 @@ class ShardConsumer {
}
@VisibleForTesting
boolean isShutdownRequested() {
public boolean isShutdownRequested() {
return shutdownReason != null;
}
@ -525,7 +519,7 @@ class ShardConsumer {
*
* @return the currentState
*/
ConsumerStates.ShardConsumerState getCurrentState() {
public KinesisConsumerStates.ShardConsumerState getCurrentState() {
return currentState.getState();
}

View file

@ -0,0 +1,48 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
public class KinesisShardConsumerFactory implements IShardConsumerFactory{
@Override
public IShardConsumer createShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpointTracker,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesUponShardCompletion,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager) {
return new KinesisShardConsumer(shardInfo,
streamConfig,
checkpointTracker,
recordProcessor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config, shardSyncer, shardSyncStrategy,
leaseCleanupManager);
}
}

View file

@ -14,25 +14,24 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.Objects;
@ -44,9 +43,9 @@ import java.util.stream.Collectors;
/**
* Task for invoking the RecordProcessor shutdown() callback.
*/
class ShutdownTask implements ITask {
public class KinesisShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class);
@VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 50;
@ -72,7 +71,7 @@ class ShutdownTask implements ITask {
* Constructor.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShutdownTask(ShardInfo shardInfo,
KinesisShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,

View file

@ -60,7 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private PrefetchCounters prefetchCounters;
private boolean started = false;
private final String operation;
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final String shardId;
/**

View file

@ -41,7 +41,7 @@ import com.amazonaws.services.kinesis.model.Shard;
/**
* Task for fetching data records and invoking processRecords() on the record processor instance.
*/
class ProcessTask implements ITask {
public class ProcessTask implements ITask {
private static final Log LOG = LogFactory.getLog(ProcessTask.class);
@ -55,7 +55,7 @@ class ProcessTask implements ITask {
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS;
private final StreamConfig streamConfig;
private final long backoffTimeMillis;
@ -81,7 +81,7 @@ class ProcessTask implements ITask {
* The retrieval strategy for fetching records from kinesis
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
GetRecordsCache getRecordsCache) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
@ -107,7 +107,7 @@ class ProcessTask implements ITask {
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
super();

View file

@ -37,7 +37,7 @@ import com.amazonaws.services.kinesis.model.Record;
* The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application
* RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment.
*/
class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
private static final Log LOG = LogFactory.getLog(RecordProcessorCheckpointer.class);
@ -62,7 +62,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
* @param checkpoint Used to checkpoint progress of a RecordProcessor
* @param validator Used for validating sequence numbers
*/
RecordProcessorCheckpointer(ShardInfo shardInfo,
public RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint,
SequenceNumberValidator validator,
IMetricsFactory metricsFactory) {
@ -231,7 +231,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
/**
* @return the lastCheckpointValue
*/
ExtendedSequenceNumber getLastCheckpointValue() {
public ExtendedSequenceNumber getLastCheckpointValue() {
return lastCheckpointValue;
}
@ -244,14 +244,14 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*
* @return the largest permitted checkpoint
*/
synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() {
public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() {
return largestPermittedCheckpointValue;
}
/**
* @param largestPermittedCheckpointValue the largest permitted checkpoint
*/
synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) {
public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) {
this.largestPermittedCheckpointValue = largestPermittedCheckpointValue;
}
@ -262,7 +262,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*
* @param extendedSequenceNumber
*/
synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) {
public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) {
this.sequenceNumberAtShardEnd = extendedSequenceNumber;
}

View file

@ -51,7 +51,7 @@ public class SequenceNumberValidator {
* @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers
* being validated
*/
SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
this.proxy = proxy;
this.shardId = shardId;
this.validateWithGetIterator = validateWithGetIterator;

View file

@ -21,14 +21,14 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotif
/**
* Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint.
*/
class ShutdownNotificationTask implements ITask {
public class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
private final ShardInfo shardInfo;
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
public ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;

View file

@ -15,8 +15,8 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState;
/**
@ -72,7 +72,7 @@ public enum ShutdownReason {
return reason.rank > this.rank;
}
ConsumerState getShutdownState() {
public ConsumerState getShutdownState() {
return shutdownState;
}
}

View file

@ -19,7 +19,7 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
/**
* Used to capture stream configuration and pass it along.
*/
class StreamConfig {
public class StreamConfig {
private final IKinesisProxy streamProxy;
private final int maxRecords;
@ -54,7 +54,7 @@ class StreamConfig {
/**
* @return the streamProxy
*/
IKinesisProxy getStreamProxy() {
public IKinesisProxy getStreamProxy() {
return streamProxy;
}
@ -82,14 +82,14 @@ class StreamConfig {
/**
* @return the initialPositionInStream
*/
InitialPositionInStreamExtended getInitialPositionInStream() {
public InitialPositionInStreamExtended getInitialPositionInStream() {
return initialPositionInStream;
}
/**
* @return validateSequenceNumberBeforeCheckpointing
*/
boolean shouldValidateSequenceNumberBeforeCheckpointing() {
public boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing;
}
}

View file

@ -24,7 +24,7 @@ import lombok.NonNull;
@Data
public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy {
@NonNull
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
@Override
public GetRecordsResult getRecords(final int maxRecords) {
@ -44,7 +44,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
}
@Override
public KinesisDataFetcher getDataFetcher() {
public IDataFetcher getDataFetcher() {
return dataFetcher;
}
}

View file

@ -137,7 +137,7 @@ public class Worker implements Runnable {
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private ConcurrentMap<ShardInfo, IShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, IShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@ -160,6 +160,9 @@ public class Worker implements Runnable {
private final LeaseCleanupManager leaseCleanupManager;
// Shard Consumer Factory
private IShardConsumerFactory shardConsumerFactory;
/**
* Constructor.
*
@ -539,7 +542,7 @@ public class Worker implements Runnable {
leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator),
leaderDecider, periodicShardSyncManager);
leaderDecider, periodicShardSyncManager, null /*ShardConsumerFactory*/);
}
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
@ -550,7 +553,7 @@ public class Worker implements Runnable {
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool,
WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider,
IPeriodicShardSyncManager periodicShardSyncManager) {
IPeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
@ -580,6 +583,7 @@ public class Worker implements Runnable {
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
this.shardConsumerFactory = shardConsumerFactory;
}
/**
@ -687,7 +691,7 @@ public class Worker implements Runnable {
boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
IShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) {
foundCompletedShard = true;
} else {
@ -983,9 +987,9 @@ public class Worker implements Runnable {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
lease, notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) {
if (consumer == null || KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) {
//
// CASE1: There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
@ -1007,7 +1011,7 @@ public class Worker implements Runnable {
return shutdownComplete;
}
ConcurrentMap<ShardInfo, ShardConsumer> getShardInfoShardConsumerMap() {
ConcurrentMap<ShardInfo, IShardConsumer> getShardInfoShardConsumerMap() {
return shardInfoShardConsumerMap;
}
@ -1107,8 +1111,8 @@ public class Worker implements Runnable {
* RecordProcessor factory
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
IShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
// lease instance (and was shutdown). Don't need to create another
@ -1123,7 +1127,7 @@ public class Worker implements Runnable {
return consumer;
}
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
protected IShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
final IRecordProcessor recordProcessor = processorFactory.createProcessor();
final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
@ -1134,7 +1138,11 @@ public class Worker implements Runnable {
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory);
return new ShardConsumer(shardInfo,
if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null
this.shardConsumerFactory = new KinesisShardConsumerFactory();
}
return shardConsumerFactory.createShardConsumer(shardInfo,
streamConfig,
checkpointTracker,
recordProcessor,
@ -1146,7 +1154,6 @@ public class Worker implements Runnable {
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config, shardSyncer, shardSyncStrategy,
@ -1355,6 +1362,8 @@ public class Worker implements Runnable {
@Setter @Accessors(fluent = true)
private IPeriodicShardSyncManager periodicShardSyncManager;
@Setter @Accessors(fluent = true)
private IShardConsumerFactory shardConsumerFactory;
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
@Setter @Accessors(fluent = true)
private LeaseCleanupValidator leaseCleanupValidator;
@ -1429,6 +1438,10 @@ public class Worker implements Runnable {
throw new IllegalArgumentException("LeaseManager, ShardSyncer, MetricsFactory, and LeaderDecider must be provided if PeriodicShardSyncManager is provided");
}
}
if(shardConsumerFactory == null){
shardConsumerFactory = new KinesisShardConsumerFactory();
}
if (recordProcessorFactory == null) {
throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker");
}
@ -1520,7 +1533,6 @@ public class Worker implements Runnable {
leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager,
Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
}
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
@ -1554,7 +1566,8 @@ public class Worker implements Runnable {
workerStateChangeListener,
shardSyncer,
leaderDecider,
periodicShardSyncManager);
periodicShardSyncManager /*PeriodicShardSyncManager*/,
shardConsumerFactory);
}
<R, T extends AwsClientBuilder<T, R>> R createClient(final T builder,

View file

@ -14,8 +14,8 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -50,7 +50,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
public class ConsumerStatesTest {
@Mock
private ShardConsumer consumer;
private KinesisShardConsumer consumer;
@Mock
private StreamConfig streamConfig;
@Mock
@ -251,9 +251,9 @@ public class ConsumerStatesTest {
equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer)));
assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification)));
assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.successTransition(), equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
@ -266,7 +266,7 @@ public class ConsumerStatesTest {
@Test
public void shutdownRequestCompleteStateTest() {
ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
ConsumerState state = KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
assertThat(state.createTask(consumer), nullValue());
@ -345,9 +345,9 @@ public class ConsumerStatesTest {
verify(shutdownNotification, never()).shutdownComplete();
}
static <ValueType> ReflectionPropertyMatcher<ShutdownTask, ValueType> shutdownTask(Class<ValueType> valueTypeClass,
static <ValueType> ReflectionPropertyMatcher<KinesisShutdownTask, ValueType> shutdownTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher);
return taskWith(KinesisShutdownTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<ShutdownNotificationTask, ValueType> shutdownReqTask(

View file

@ -49,7 +49,7 @@ public class GracefulShutdownCoordinatorTest {
@Mock
private Callable<GracefulShutdownContext> contextCallable;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
private ConcurrentMap<ShardInfo, IShardConsumer> shardInfoConsumerMap;
@Test
public void testAllShutdownCompletedAlready() throws Exception {

View file

@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* Unit tests of {@link ShardConsumer}.
* Unit tests of {@link KinesisShardConsumer}.
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardConsumerTest {
@ -160,8 +160,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -175,19 +175,19 @@ public class ShardConsumerTest {
config,
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
}
/**
@ -210,8 +210,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -226,21 +226,21 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
}
@Test
@ -258,8 +258,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -273,19 +273,19 @@ public class ShardConsumerTest {
config,
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
}
@ -300,8 +300,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -324,10 +324,10 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(processor, times(0)).initialize(any(InitializationInput.class));
// Throw Error when IRecordProcessor.initialize() is invoked.
@ -335,7 +335,7 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
@ -347,7 +347,7 @@ public class ShardConsumerTest {
assertThat(e.getCause(), instanceOf(ExecutionException.class));
}
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
@ -355,7 +355,7 @@ public class ShardConsumerTest {
consumer.consumeShard(); // submit InitializeTask again.
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(2)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args
@ -363,11 +363,11 @@ public class ShardConsumerTest {
// Checking the status of submitted InitializeTask from above should pass.
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
}
/**
* Test method for {@link ShardConsumer#consumeShard()}
* Test method for {@link KinesisShardConsumer#consumeShard()}
*/
@Test
public final void testConsumeShard() throws Exception {
@ -420,8 +420,8 @@ public class ShardConsumerTest {
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -440,11 +440,11 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
@ -454,7 +454,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -469,21 +469,21 @@ public class ShardConsumerTest {
assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true));
Thread.sleep(50);
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
verify(shutdownNotification).shutdownNotificationComplete();
assertThat(processor.isShutdownNotificationCalled(), equalTo(true));
consumer.consumeShard();
Thread.sleep(50);
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED));
consumer.beginShutdown();
Thread.sleep(50L);
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown();
consumer.consumeShard();
verify(shutdownNotification, atLeastOnce()).shutdownComplete();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
verify(getRecordsCache).shutdown();
@ -524,8 +524,8 @@ public class ShardConsumerTest {
when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(streamConfig.getStreamProxy()).thenReturn(streamProxy);
final ShardConsumer consumer =
new ShardConsumer(shardInfo,
final KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -544,21 +544,21 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(parentLease, times(0)).getCheckpoint();
consumer.consumeShard(); // check on parent shards
Thread.sleep(parentShardPollIntervalMillis * 2);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(parentLease, times(1)).getCheckpoint();
consumer.notifyShutdownRequested(shutdownNotification);
verify(shutdownNotification, times(0)).shutdownComplete();
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
Thread.sleep(50L);
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.isShutdown(), is(true));
verify(shutdownNotification, times(1)).shutdownComplete();
consumer.beginShutdown();
@ -583,7 +583,7 @@ public class ShardConsumerTest {
}
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record
* Test method for {@link KinesisShardConsumer#consumeShard()} that ensures a transient error thrown from the record
* processor's shutdown method with reason zombie will be retried.
*/
@Test
@ -646,8 +646,8 @@ public class ShardConsumerTest {
metricsFactory
);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -667,11 +667,11 @@ public class ShardConsumerTest {
shardSyncStrategy);
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
@ -681,7 +681,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -709,12 +709,12 @@ public class ShardConsumerTest {
// Wait for shutdown complete now that terminate shutdown is successful
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
break;
}
Thread.sleep(50L);
}
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
@ -732,7 +732,7 @@ public class ShardConsumerTest {
/**
* Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown
* Test method for {@link KinesisShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown
* reason TERMINATE when the shard end is reached.
*/
@Test
@ -795,8 +795,8 @@ public class ShardConsumerTest {
metricsFactory
);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -818,11 +818,11 @@ public class ShardConsumerTest {
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
@ -832,7 +832,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -860,12 +860,12 @@ public class ShardConsumerTest {
// Wait for shutdown complete now that terminate shutdown is successful
for (int i = 0; i < 100; i++) {
consumer.consumeShard();
if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) {
break;
}
Thread.sleep(50L);
}
assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE)));
@ -881,7 +881,7 @@ public class ShardConsumerTest {
}
/**
* Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
* Test method for {@link KinesisShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP.
*/
@Test
public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception {
@ -938,8 +938,8 @@ public class ShardConsumerTest {
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -958,11 +958,11 @@ public class ShardConsumerTest {
shardSyncer,
shardSyncStrategy);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards
Thread.sleep(50L);
consumer.consumeShard(); // start initialization
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize
Thread.sleep(50L);
@ -973,7 +973,7 @@ public class ShardConsumerTest {
boolean newTaskSubmitted = consumer.consumeShard();
if (newTaskSubmitted) {
LOG.debug("New processing task was submitted, call # " + i);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
// CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES
i += maxRecords;
}
@ -985,9 +985,9 @@ public class ShardConsumerTest {
assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
executorService.shutdown();
@ -1014,8 +1014,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer consumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer consumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -1041,22 +1041,22 @@ public class ShardConsumerTest {
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // submit BlockOnParentShardTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
verify(processor, times(0)).initialize(any(InitializationInput.class));
consumer.consumeShard(); // submit InitializeTask
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING)));
verify(processor, times(1)).initialize(argThat(
initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber)));
verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args
consumer.consumeShard();
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING)));
}
@Test
@ -1069,8 +1069,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer shardConsumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -1101,8 +1101,8 @@ public class ShardConsumerTest {
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
KinesisShardConsumer shardConsumer =
new KinesisShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
@ -1144,7 +1144,7 @@ public class ShardConsumerTest {
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer = new ShardConsumer(
KinesisShardConsumer shardConsumer = new KinesisShardConsumer(
shardInfo,
streamConfig,
checkpoint,

View file

@ -66,7 +66,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShutdownTask.RETRY_RANDOM_MAX_RANGE;
/**
*
@ -139,7 +139,7 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link ShutdownTask#call()}.
* Test method for {@link KinesisShutdownTask#call()}.
*/
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() {
@ -148,7 +148,7 @@ public class ShutdownTaskTest {
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -171,7 +171,7 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link ShutdownTask#call()}.
* Test method for {@link KinesisShutdownTask#call()}.
*/
@Test
public final void testCallWhenCreatingLeaseThrows() throws Exception {
@ -183,7 +183,7 @@ public class ShutdownTaskTest {
final String exceptionMessage = "InvalidStateException is thrown.";
when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage));
ShutdownTask task = new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -226,7 +226,7 @@ public class ShutdownTaskTest {
// Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -252,7 +252,7 @@ public class ShutdownTaskTest {
}
// Make next attempt with complete parent info in lease table
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -290,7 +290,7 @@ public class ShutdownTaskTest {
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null);
for (int i = 0; i < 10; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -315,7 +315,7 @@ public class ShutdownTaskTest {
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
}
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -351,7 +351,7 @@ public class ShutdownTaskTest {
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -385,7 +385,7 @@ public class ShutdownTaskTest {
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(shardInfo,
KinesisShutdownTask task = new KinesisShutdownTask(shardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
@ -415,7 +415,7 @@ public class ShutdownTaskTest {
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;
ShutdownTask task = new ShutdownTask(defaultShardInfo,
KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.ZOMBIE,
@ -438,12 +438,12 @@ public class ShutdownTaskTest {
}
/**
* Test method for {@link ShutdownTask#getTaskType()}.
* Test method for {@link KinesisShutdownTask#getTaskType()}.
*/
@Test
public final void testGetTaskType() {
KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class);
ShutdownTask task = new ShutdownTask(null, null, null, null,
KinesisShutdownTask task = new KinesisShutdownTask(null, null, null, null,
null, null, false,
false, leaseCoordinator, 0,
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager);

View file

@ -186,7 +186,7 @@ public class WorkerTest {
@Mock
private IRecordProcessor v2RecordProcessor;
@Mock
private ShardConsumer shardConsumer;
private IShardConsumer shardConsumer;
@Mock
private Future<TaskResult> taskFuture;
@Mock
@ -297,13 +297,13 @@ public class WorkerTest {
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
IShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
Assert.assertNotNull(consumer);
ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
IShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
Assert.assertSame(consumer, consumer2);
ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken =
new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumer3 =
IShardConsumer consumer3 =
worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory);
Assert.assertNotNull(consumer3);
Assert.assertNotSame(consumer3, consumer);
@ -419,10 +419,10 @@ public class WorkerTest {
new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory);
ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken =
IShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory);
IShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken =
worker.createOrGetShardConsumer(duplicateOfShardInfo1ButWithAnotherConcurrencyToken, streamletFactory);
ShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory);
IShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory);
Set<ShardInfo> assignedShards = new HashSet<ShardInfo>();
assignedShards.add(shardInfo1);
@ -1219,11 +1219,11 @@ public class WorkerTest {
false,
shardPrioritization);
final Map<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap();
final Map<ShardInfo, IShardConsumer> shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap();
final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease);
final ShardConsumer completedShardConsumer = mock(ShardConsumer.class);
final KinesisShardConsumer completedShardConsumer = mock(KinesisShardConsumer.class);
shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer);
when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE);
when(completedShardConsumer.getCurrentState()).thenReturn(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE);
Callable<GracefulShutdownContext> callable = worker.createWorkerShutdownCallable();
assertThat(worker.hasGracefulShutdownStarted(), equalTo(false));
@ -1338,11 +1338,11 @@ public class WorkerTest {
verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
.withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
.withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
}
@ -1451,11 +1451,11 @@ public class WorkerTest {
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
.withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1)))));
verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class))
.and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher
.withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
.withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2)))));
@ -2013,19 +2013,19 @@ public class WorkerTest {
@Override
protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) {
return Condition.matched(item, mismatchDescription)
.and(new Condition.Step<MetricsCollectingTaskDecorator, ShutdownTask>() {
.and(new Condition.Step<MetricsCollectingTaskDecorator, KinesisShutdownTask>() {
@Override
public Condition<ShutdownTask> apply(MetricsCollectingTaskDecorator value,
public Condition<KinesisShutdownTask> apply(MetricsCollectingTaskDecorator value,
Description mismatch) {
if (!(value.getOther() instanceof ShutdownTask)) {
if (!(value.getOther() instanceof KinesisShutdownTask)) {
mismatch.appendText("Wrapped task isn't a shutdown task");
return Condition.notMatched();
}
return Condition.matched((ShutdownTask) value.getOther(), mismatch);
return Condition.matched((KinesisShutdownTask) value.getOther(), mismatch);
}
}).and(new Condition.Step<ShutdownTask, ShutdownReason>() {
}).and(new Condition.Step<KinesisShutdownTask, ShutdownReason>() {
@Override
public Condition<ShutdownReason> apply(ShutdownTask value, Description mismatch) {
public Condition<ShutdownReason> apply(KinesisShutdownTask value, Description mismatch) {
return Condition.matched(value.getReason(), mismatch);
}
}).matching(matcher);