added tests and fixed broken tests

This commit is contained in:
Wei 2017-09-21 11:02:14 -07:00
parent 248605ed91
commit 5c57dfe0db
8 changed files with 179 additions and 210 deletions

View file

@ -308,9 +308,10 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool());
consumer.getConfig().getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRetryGetRecordsInSeconds(),
consumer.getMaxGetRecordsThreadPool());
}
@Override

View file

@ -19,6 +19,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -62,6 +63,7 @@ class ProcessTask implements ITask {
private final long backoffTimeMillis;
private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
@ -81,6 +83,8 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* Record processor factory to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
@ -89,37 +93,11 @@ class ProcessTask implements ITask {
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param retryGetRecordsInSeconds
* time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* max number of threads in the getRecords thread pool.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this(shardInfo, streamConfig, recordProcessor, new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords()),
recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
@ -153,31 +131,6 @@ class ProcessTask implements ITask {
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param throttlingReporter
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this(shardInfo, streamConfig, recordProcessor, new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords()),
recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
throttlingReporter, getRecordsRetrievalStrategy);
}
/**
* @param shardInfo
* contains information about the shard
@ -209,6 +162,7 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.recordsFetcher = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will

View file

@ -44,7 +44,7 @@ class ShardConsumer {
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
@Getter
private final RecordsFetcherFactory recordsFetcherFactory;
private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService;
private final ShardInfo shardInfo;
@ -83,6 +83,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
@ -94,6 +95,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
@ -101,9 +103,9 @@ class ShardConsumer {
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
@ -111,6 +113,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
@ -124,6 +127,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
@ -135,59 +139,7 @@ class ShardConsumer {
Optional<Integer> maxGetRecordsThreadPool) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords());
this.executorService = executorService;
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer =
new RecordProcessorCheckpointer(shardInfo,
checkpoint,
new SequenceNumberValidator(streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordsFetcherFactory RecordFetcher factory used to instantiate a recordFetcher object
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.recordsFetcherFactory = recordsFetcherFactory;
this.config = config;
this.executorService = executorService;
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;

View file

@ -73,7 +73,7 @@ public class Worker implements Runnable {
private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory;
private final RecordsFetcherFactory recordsFetcherFactory;
private final KinesisClientLibConfiguration config;
private final StreamConfig streamConfig;
private final InitialPositionInStreamExtended initialPosition;
private final ICheckpoint checkpointTracker;
@ -246,7 +246,7 @@ public class Worker implements Runnable {
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory, ExecutorService execService) {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config.getRecordsFetcherFactory(),
config,
new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()),
@ -308,6 +308,8 @@ public class Worker implements Runnable {
* Name of the Kinesis application
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @paran config
* Kinesis Library configuration
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
@ -335,24 +337,25 @@ public class Worker implements Runnable {
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
}
/**
* @param applicationName
* Name of the Kinesis application
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Library Configuration
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
@ -384,7 +387,7 @@ public class Worker implements Runnable {
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
@ -393,75 +396,7 @@ public class Worker implements Runnable {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords());
this.streamConfig = streamConfig;
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
}
/**
* @param applicationName
* Name of the Kinesis application
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param recordsFetcherFactory
* Used to get record fetcher instances for fetching record from shards
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* this location in the stream when an application starts up for the first time and there are no
* checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis
* Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
* Lease coordinator (coordinates currently owned leases)
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory
* Metrics factory used to emit metrics
* @param taskBackoffTimeMillis
* Backoff period when tasks encounter an exception
* @param shardPrioritization
* Provides prioritization logic to decide which available shards process first
* @param retryGetRecordsInSeconds
* Time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* Max number of threads in the getRecords thread pool.
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, RecordsFetcherFactory recordsFetcherFactory, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.recordsFetcherFactory = recordsFetcherFactory;
this.config = config;
this.streamConfig = streamConfig;
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
@ -912,7 +847,7 @@ public class Worker implements Runnable {
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IRecordProcessor recordProcessor = processorFactory.createProcessor();
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, recordsFetcherFactory,
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
@ -1315,7 +1250,7 @@ public class Worker implements Runnable {
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config.getRecordsFetcherFactory(),
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
config.getMaxRecords(),

View file

@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -57,6 +58,8 @@ public class ConsumerStatesTest {
@Mock
private IRecordProcessor recordProcessor;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock
private ExecutorService executorService;
@ -76,6 +79,10 @@ public class ConsumerStatesTest {
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private GetRecordsCache recordsFetcher;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
@ -98,7 +105,8 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getConfig()).thenReturn(config);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
}
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@ -215,6 +223,37 @@ public class ConsumerStatesTest {
}
@Test
public void processingStateRecordsFetcher() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
when(recordsFetcherFactory.createRecordsFetcher((any()))).thenReturn(recordsFetcher);
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsCache.class, "recordsFetcher", equalTo(recordsFetcher)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();

View file

@ -0,0 +1,41 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
public class RecordsFetcherFactoryTest {
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory(1);
}
@Test
public void createDefaultRecordsFetcherTest() {
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
}
@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
}
}

View file

@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.ListIterator;
@ -45,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
@ -97,6 +99,10 @@ public class ShardConsumerTest {
@Mock
private IRecordProcessor processor;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private IKinesisProxy streamProxy;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@ -104,7 +110,6 @@ public class ShardConsumerTest {
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
/**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/
@ -129,6 +134,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -177,6 +183,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -205,6 +212,7 @@ public class ShardConsumerTest {
@SuppressWarnings("unchecked")
@Test
public final void testRecordProcessorThrowable() throws Exception {
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
@ -218,6 +226,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -297,7 +306,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords));
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -313,6 +322,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -399,7 +409,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -416,6 +426,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -478,6 +489,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -489,6 +501,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));

View file

@ -133,6 +133,8 @@ public class WorkerTest {
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory;
@ -210,6 +212,8 @@ public class WorkerTest {
public final void testCreateOrGetShardConsumer() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -228,7 +232,9 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -257,6 +263,8 @@ public class WorkerTest {
public void testWorkerLoopWithCheckpoint() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -275,7 +283,7 @@ public class WorkerTest {
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint)
.thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
@ -314,6 +322,8 @@ public class WorkerTest {
public final void testCleanupShardConsumers() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -332,7 +342,9 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -371,6 +383,8 @@ public class WorkerTest {
public final void testInitializationFailureWithRetries() {
String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
int count = 0;
when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2;
@ -386,6 +400,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
@ -709,6 +724,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -742,7 +759,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -785,6 +802,8 @@ public class WorkerTest {
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -816,7 +835,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -850,6 +869,8 @@ public class WorkerTest {
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -888,7 +909,7 @@ public class WorkerTest {
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -926,6 +947,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -950,7 +973,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -988,6 +1011,8 @@ public class WorkerTest {
public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1020,7 +1045,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1089,6 +1114,8 @@ public class WorkerTest {
public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1121,7 +1148,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1195,6 +1222,8 @@ public class WorkerTest {
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1226,7 +1255,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1267,6 +1296,8 @@ public class WorkerTest {
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1298,7 +1329,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1336,13 +1367,14 @@ public class WorkerTest {
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
@ -1649,10 +1681,12 @@ public class WorkerTest {
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
Worker worker =
new Worker(stageName,
recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,