This commit is contained in:
BtXin 2017-09-21 21:05:38 +00:00 committed by GitHub
commit e3be69082d
12 changed files with 468 additions and 69 deletions

View file

@ -14,8 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Optional;
/**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects.
@ -310,9 +308,10 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool());
consumer.getConfig().getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRetryGetRecordsInSeconds(),
consumer.getMaxGetRecordsThreadPool());
}
@Override

View file

@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration {
@Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
@Getter
private RecordsFetcherFactory recordsFetcherFactory;
/**
* Constructor.
*
@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration {
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
RecordsFetcherFactory recordsFetcherFactory) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
checkIsValuePositive("MaxRecords", (long) maxRecords);
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
this.failoverTimeMillis = failoverTimeMillis;
this.maxRecords = maxRecords;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
this.metricsMaxQueueSize = metricsMaxQueueSize;
this.metricsLevel = DEFAULT_METRICS_LEVEL;
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
this.regionName = regionName;
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.shutdownGraceMillis = shutdownGraceMillis;
this.shutdownGraceMillis = shutdownGraceMillis;
}
@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration {
return this;
}
/**
*
* @param maxCacheSize the max number of records stored in the getRecordsCache
* @return this configuration object
*/
public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) {
checkIsValuePositive("maxCacheSize", maxCacheSize);
recordsFetcherFactory.setMaxSize(maxCacheSize);
return this;
}
public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) {
checkIsValuePositive("maxCacheByteSize", maxCacheByteSize);
recordsFetcherFactory.setMaxByteSize(maxCacheByteSize);
return this;
}
public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy));
return this;
}
public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) {
checkIsValuePositive("maxRecordsCount", maxRecordsCount);
recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount);
return this;
}
/**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/

View file

@ -15,11 +15,11 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,6 +55,7 @@ class ProcessTask implements ITask {
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final GetRecordsCache recordsFetcher;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS;
@ -62,7 +63,6 @@ class ProcessTask implements ITask {
private final long backoffTimeMillis;
private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
@ -83,6 +83,8 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* Record processor factory to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
@ -91,10 +93,11 @@ class ProcessTask implements ITask {
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
@ -104,6 +107,8 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* Record processor factory to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
@ -116,11 +121,12 @@ class ProcessTask implements ITask {
* max number of threads in the getRecords thread pool.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool) {
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
}
@ -132,6 +138,8 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* RecordFetcher factory used to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
@ -142,9 +150,9 @@ class ProcessTask implements ITask {
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@ -155,6 +163,7 @@ class ProcessTask implements ITask {
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.recordsFetcher = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -410,12 +419,7 @@ class ProcessTask implements ITask {
* @return list of data records from Kinesis
*/
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords());
if (getRecordsResult == null) {
// Stream no longer exists
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
}
final GetRecordsResult getRecordsResult = recordsFetcher.getNextResult();
if (getRecordsResult.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,

View file

@ -0,0 +1,39 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.
*/
public interface RecordsFetcherFactory {
/**
* Returns a records fetcher processor to be used for processing data records for a (assigned) shard.
*
* @return Returns a record fetcher object
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy);
void setMaxSize(int maxSize);
void setMaxByteSize(int maxByteSize);
void setMaxRecordsCount(int maxRecordsCount);
void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy);
}

View file

@ -43,6 +43,8 @@ class ShardConsumer {
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
@Getter
private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService;
private final ShardInfo shardInfo;
@ -81,6 +83,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
@ -92,6 +95,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
@ -99,9 +103,9 @@ class ShardConsumer {
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
@ -109,6 +113,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard
@ -122,6 +127,7 @@ class ShardConsumer {
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
@ -133,6 +139,7 @@ class ShardConsumer {
Optional<Integer> maxGetRecordsThreadPool) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.config = config;
this.executorService = executorService;
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;

View file

@ -0,0 +1,69 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.extern.apachecommons.CommonsLog;
import java.util.concurrent.Executors;
@CommonsLog
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private final int maxRecords;
private int maxSize = 10;
private int maxByteSize = 15 * 1024 * 1024;
private int maxRecordsCount = 30000;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
}
public SimpleRecordsFetcherFactory(int maxRecords, int maxSize, int maxByteSize, int maxRecordsCount) {
this.maxRecords = maxRecords;
this.maxSize = maxSize;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
}
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, dataFetchingStrategy,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
}
}
@Override
public void setMaxSize(int maxSize){
this.maxSize = maxSize;
}
@Override
public void setMaxByteSize(int maxByteSize){
this.maxByteSize = maxByteSize;
}
@Override
public void setMaxRecordsCount(int maxRecordsCount) {
this.maxRecordsCount = maxRecordsCount;
}
@Override
public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}
}

View file

@ -73,6 +73,7 @@ public class Worker implements Runnable {
private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory;
private final KinesisClientLibConfiguration config;
private final StreamConfig streamConfig;
private final InitialPositionInStreamExtended initialPosition;
private final ICheckpoint checkpointTracker;
@ -245,6 +246,7 @@ public class Worker implements Runnable {
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory, ExecutorService execService) {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config,
new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()),
@ -306,6 +308,8 @@ public class Worker implements Runnable {
* Name of the Kinesis application
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @paran config
* Kinesis Library configuration
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
@ -333,24 +337,25 @@ public class Worker implements Runnable {
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
}
/**
* @param applicationName
* Name of the Kinesis application
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Library Configuration
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
@ -382,7 +387,7 @@ public class Worker implements Runnable {
*/
// NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
@ -391,6 +396,7 @@ public class Worker implements Runnable {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
this.streamConfig = streamConfig;
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
@ -411,7 +417,6 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
}
/**
* @return the applicationName
*/
@ -819,11 +824,11 @@ public class Worker implements Runnable {
*
* @param shardInfo
* Kinesis shard info
* @param factory
* @param processorFactory
* RecordProcessor factory
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
@ -832,17 +837,17 @@ public class Worker implements Runnable {
// completely processed (shutdown reason terminate).
if ((consumer == null)
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
consumer = buildConsumer(shardInfo, factory);
consumer = buildConsumer(shardInfo, processorFactory);
shardInfoShardConsumerMap.put(shardInfo, consumer);
wlog.infoForce("Created new shardConsumer for : " + shardInfo);
}
return consumer;
}
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
IRecordProcessor recordProcessor = factory.createProcessor();
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IRecordProcessor recordProcessor = processorFactory.createProcessor();
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
@ -1049,6 +1054,7 @@ public class Worker implements Runnable {
public static class Builder {
private IRecordProcessorFactory recordProcessorFactory;
private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config;
private AmazonKinesis kinesisClient;
private AmazonDynamoDB dynamoDBClient;
@ -1244,6 +1250,7 @@ public class Worker implements Runnable {
return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
config.getMaxRecords(),

View file

@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -57,6 +58,8 @@ public class ConsumerStatesTest {
@Mock
private IRecordProcessor recordProcessor;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock
private ExecutorService executorService;
@ -76,6 +79,10 @@ public class ConsumerStatesTest {
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private GetRecordsCache recordsFetcher;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
@ -98,7 +105,8 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getConfig()).thenReturn(config);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
}
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@ -215,6 +223,37 @@ public class ConsumerStatesTest {
}
@Test
public void processingStateRecordsFetcher() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
when(recordsFetcherFactory.createRecordsFetcher((any()))).thenReturn(recordsFetcher);
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsCache.class, "recordsFetcher", equalTo(recordsFetcher)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();
@ -321,7 +360,7 @@ public class ConsumerStatesTest {
}
static <ValueType> ReflectionPropertyMatcher<ShutdownTask, ValueType> shutdownTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher);
}
@ -331,17 +370,17 @@ public class ConsumerStatesTest {
}
static <ValueType> ReflectionPropertyMatcher<ProcessTask, ValueType> procTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
String propertyName, Matcher<ValueType> matcher) {
return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher);
}
static <ValueType> ReflectionPropertyMatcher<InitializeTask, ValueType> initTask(Class<ValueType> valueTypeClass,
String propertyName, Matcher<ValueType> matcher) {
String propertyName, Matcher<ValueType> matcher) {
return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher);
}
static <TaskType, ValueType> ReflectionPropertyMatcher<TaskType, ValueType> taskWith(Class<TaskType> taskTypeClass,
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
Class<ValueType> valueTypeClass, String propertyName, Matcher<ValueType> matcher) {
return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName);
}
@ -354,7 +393,7 @@ public class ConsumerStatesTest {
private final Field matchingField;
private ReflectionPropertyMatcher(Class<TaskType> taskTypeClass, Class<ValueType> valueTypeClass,
Matcher<ValueType> matcher, String propertyName) {
Matcher<ValueType> matcher, String propertyName) {
this.taskTypeClass = taskTypeClass;
this.valueTypeClazz = valueTypeClass;
this.matcher = matcher;

View file

@ -78,6 +78,10 @@ public class ProcessTaskTest {
private ThrottlingReporter throttlingReporter;
@Mock
private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy;
@Mock
private RecordsFetcherFactory mockRecordsFetcherFactory;
@Mock
private GetRecordsCache mockRecordsFetcher;
private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
@ -94,8 +98,9 @@ public class ProcessTaskTest {
skipCheckpointValidationValue,
INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher);
processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy);
}
@ -103,13 +108,13 @@ public class ProcessTaskTest {
public void testProcessTaskWithProvisionedThroughputExceededException() {
// Set data fetcher to throw exception
doReturn(false).when(mockDataFetcher).isShardEndReached();
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy)
.getRecords(maxRecords);
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher)
.getNextResult();
TaskResult result = processTask.call();
verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success();
verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords));
verify(mockRecordsFetcher).getNextResult();
assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException);
}
@ -117,10 +122,10 @@ public class ProcessTaskTest {
@Test
public void testProcessTaskWithNonExistentStream() {
// Data fetcher returns a null Result when the stream does not exist
doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords);
doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult();
TaskResult result = processTask.call();
verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords));
verify(mockRecordsFetcher).getNextResult();
assertNull("Task should not throw an exception", result.getException());
}
@ -304,14 +309,14 @@ public class ProcessTaskTest {
private void testWithRecords(List<Record> records,
ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(
when(mockRecordsFetcher.getNextResult()).thenReturn(
new GetRecordsResult().withRecords(records));
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call();
verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled();
verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt());
verify(mockRecordsFetcher).getNextResult();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture());
processedRecords = priCaptor.getValue().getRecords();

View file

@ -0,0 +1,41 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
public class RecordsFetcherFactoryTest {
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory(1);
}
@Test
public void createDefaultRecordsFetcherTest() {
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
}
@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
}
}

View file

@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.ListIterator;
@ -45,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
@ -97,6 +99,10 @@ public class ShardConsumerTest {
@Mock
private IRecordProcessor processor;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private IKinesisProxy streamProxy;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@ -104,7 +110,6 @@ public class ShardConsumerTest {
private ICheckpoint checkpoint;
@Mock
private ShutdownNotification shutdownNotification;
/**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/
@ -129,6 +134,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -177,6 +183,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -205,6 +212,7 @@ public class ShardConsumerTest {
@SuppressWarnings("unchecked")
@Test
public final void testRecordProcessorThrowable() throws Exception {
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
@ -218,6 +226,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -297,7 +306,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords));
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -313,6 +322,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -399,7 +409,7 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig =
@ -416,6 +426,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -478,6 +489,7 @@ public class ShardConsumerTest {
streamConfig,
checkpoint,
processor,
config,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
@ -489,6 +501,7 @@ public class ShardConsumerTest {
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));

View file

@ -133,6 +133,8 @@ public class WorkerTest {
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory;
@ -210,6 +212,8 @@ public class WorkerTest {
public final void testCreateOrGetShardConsumer() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -228,7 +232,9 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -257,6 +263,8 @@ public class WorkerTest {
public void testWorkerLoopWithCheckpoint() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -275,7 +283,7 @@ public class WorkerTest {
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint)
.thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
@ -314,6 +322,8 @@ public class WorkerTest {
public final void testCleanupShardConsumers() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -332,7 +342,9 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -371,6 +383,8 @@ public class WorkerTest {
public final void testInitializationFailureWithRetries() {
String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
int count = 0;
when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2;
@ -386,6 +400,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
@ -709,6 +724,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -742,7 +759,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -785,6 +802,8 @@ public class WorkerTest {
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -816,7 +835,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -850,6 +869,8 @@ public class WorkerTest {
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -888,7 +909,7 @@ public class WorkerTest {
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -926,6 +947,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -950,7 +973,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -988,6 +1011,8 @@ public class WorkerTest {
public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1020,7 +1045,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1089,6 +1114,8 @@ public class WorkerTest {
public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1121,7 +1148,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1195,6 +1222,8 @@ public class WorkerTest {
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1226,7 +1255,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1267,6 +1296,8 @@ public class WorkerTest {
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1298,7 +1329,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1336,13 +1367,14 @@ public class WorkerTest {
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,
KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream,
super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
@ -1649,10 +1681,12 @@ public class WorkerTest {
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
Worker worker =
new Worker(stageName,
recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,