integrated prefetch with shardconsumer
This commit is contained in:
parent
40aaece7c3
commit
1c07b45166
5 changed files with 337 additions and 25 deletions
|
|
@ -14,8 +14,6 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
|
||||
* and state transitions is contained within the {@link ConsumerState} objects.
|
||||
|
|
@ -310,7 +308,7 @@ class ConsumerStates {
|
|||
@Override
|
||||
public ITask createTask(ShardConsumer consumer) {
|
||||
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
|
||||
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
|
||||
consumer.getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
|
||||
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -226,6 +226,9 @@ public class KinesisClientLibConfiguration {
|
|||
@Getter
|
||||
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
|
||||
|
||||
@Getter
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
|
|
@ -444,6 +447,116 @@ public class KinesisClientLibConfiguration {
|
|||
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param applicationName Name of the Kinesis application
|
||||
* By default the application name is included in the user agent string used to make AWS requests. This
|
||||
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
|
||||
* @param streamName Name of the Kinesis stream
|
||||
* @param kinesisEndpoint Kinesis endpoint
|
||||
* @param dynamoDBEndpoint DynamoDB endpoint
|
||||
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
|
||||
* records from that location in the stream when an application starts up for the first time and there
|
||||
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
|
||||
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
|
||||
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
|
||||
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
|
||||
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
||||
* @param workerId Used to distinguish different workers/processes of a Kinesis application
|
||||
* @param maxRecords Max records to read per Kinesis getRecords() call
|
||||
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
||||
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
|
||||
* GetRecords returned an empty record list.
|
||||
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
||||
* in Kinesis)
|
||||
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
||||
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
|
||||
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
|
||||
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
|
||||
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
||||
* with a call to Amazon Kinesis before checkpointing for calls to
|
||||
* {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||
* @param regionName The region name for the service
|
||||
*/
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
String kinesisEndpoint,
|
||||
String dynamoDBEndpoint,
|
||||
InitialPositionInStream initialPositionInStream,
|
||||
AWSCredentialsProvider kinesisCredentialsProvider,
|
||||
AWSCredentialsProvider dynamoDBCredentialsProvider,
|
||||
AWSCredentialsProvider cloudWatchCredentialsProvider,
|
||||
long failoverTimeMillis,
|
||||
String workerId,
|
||||
int maxRecords,
|
||||
long idleTimeBetweenReadsInMillis,
|
||||
boolean callProcessRecordsEvenForEmptyRecordList,
|
||||
long parentShardPollIntervalMillis,
|
||||
long shardSyncIntervalMillis,
|
||||
boolean cleanupTerminatedShardsBeforeExpiry,
|
||||
ClientConfiguration kinesisClientConfig,
|
||||
ClientConfiguration dynamoDBClientConfig,
|
||||
ClientConfiguration cloudWatchClientConfig,
|
||||
long taskBackoffTimeMillis,
|
||||
long metricsBufferTimeMillis,
|
||||
int metricsMaxQueueSize,
|
||||
boolean validateSequenceNumberBeforeCheckpointing,
|
||||
String regionName,
|
||||
RecordsFetcherFactory recordsFetcherFactory) {
|
||||
// Check following values are greater than zero
|
||||
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
|
||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
|
||||
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
|
||||
checkIsValuePositive("MaxRecords", (long) maxRecords);
|
||||
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
|
||||
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
|
||||
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
|
||||
checkIsRegionNameValid(regionName);
|
||||
this.applicationName = applicationName;
|
||||
this.tableName = applicationName;
|
||||
this.streamName = streamName;
|
||||
this.kinesisEndpoint = kinesisEndpoint;
|
||||
this.dynamoDBEndpoint = dynamoDBEndpoint;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
|
||||
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
|
||||
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.maxRecords = maxRecords;
|
||||
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
|
||||
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
|
||||
this.workerIdentifier = workerId;
|
||||
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
|
||||
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
|
||||
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
|
||||
this.metricsMaxQueueSize = metricsMaxQueueSize;
|
||||
this.metricsLevel = DEFAULT_METRICS_LEVEL;
|
||||
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
|
||||
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
|
||||
this.regionName = regionName;
|
||||
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
|
||||
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
|
||||
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
|
||||
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
|
||||
this.initialPositionInStreamExtended =
|
||||
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
}
|
||||
|
||||
// Check if value is positive, otherwise throw an exception
|
||||
|
|
@ -1138,6 +1251,28 @@ public class KinesisClientLibConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param maxCacheSize the max number of records stored in the getRecordsCache
|
||||
* @return this configuration object
|
||||
*/
|
||||
public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) {
|
||||
checkIsValuePositive("maxCacheSize", maxCacheSize);
|
||||
recordsFetcherFactory.setMaxSize(maxCacheSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) {
|
||||
checkIsValuePositive("maxCacheByteSize", maxCacheByteSize);
|
||||
recordsFetcherFactory.setMaxByteSize(maxCacheByteSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) {
|
||||
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Optional;
|
||||
|
|
@ -55,6 +54,7 @@ class ProcessTask implements ITask {
|
|||
|
||||
private final ShardInfo shardInfo;
|
||||
private final IRecordProcessor recordProcessor;
|
||||
private final GetRecordsCache recordsFetcher;
|
||||
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
|
||||
private final KinesisDataFetcher dataFetcher;
|
||||
private final TaskType taskType = TaskType.PROCESS;
|
||||
|
|
@ -63,8 +63,6 @@ class ProcessTask implements ITask {
|
|||
private final Shard shard;
|
||||
private final ThrottlingReporter throttlingReporter;
|
||||
|
||||
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
|
||||
|
||||
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
|
||||
Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool,
|
||||
|
|
@ -119,8 +117,38 @@ class ProcessTask implements ITask {
|
|||
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
|
||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
this(shardInfo, streamConfig, recordProcessor, new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords()),
|
||||
recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo
|
||||
* contains information about the shard
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param recordProcessor
|
||||
* Record processor used to process the data records for the shard
|
||||
* @param recordsFetcherFactory
|
||||
* Record processor factory to create recordFetcher object
|
||||
* @param recordProcessorCheckpointer
|
||||
* Passed to the RecordProcessor so it can checkpoint progress
|
||||
* @param dataFetcher
|
||||
* Kinesis data fetcher (used to fetch records from Kinesis)
|
||||
* @param backoffTimeMillis
|
||||
* backoff time when catching exceptions
|
||||
* @param retryGetRecordsInSeconds
|
||||
* time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool
|
||||
* max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
|
||||
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
|
||||
KinesisDataFetcher dataFetcher, long backoffTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
|
||||
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
|
||||
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
|
||||
}
|
||||
|
|
@ -142,9 +170,36 @@ class ProcessTask implements ITask {
|
|||
* determines how throttling events should be reported in the log.
|
||||
*/
|
||||
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
|
||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
|
||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||
this(shardInfo, streamConfig, recordProcessor, new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords()),
|
||||
recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
throttlingReporter, getRecordsRetrievalStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo
|
||||
* contains information about the shard
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param recordProcessor
|
||||
* Record processor used to process the data records for the shard
|
||||
* @param recordsFetcherFactory
|
||||
* RecordFetcher factory used to create recordFetcher object
|
||||
* @param recordProcessorCheckpointer
|
||||
* Passed to the RecordProcessor so it can checkpoint progress
|
||||
* @param dataFetcher
|
||||
* Kinesis data fetcher (used to fetch records from Kinesis)
|
||||
* @param backoffTimeMillis
|
||||
* backoff time when catching exceptions
|
||||
* @param throttlingReporter
|
||||
* determines how throttling events should be reported in the log.
|
||||
*/
|
||||
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
|
||||
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
|
||||
KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
|
||||
super();
|
||||
this.shardInfo = shardInfo;
|
||||
this.recordProcessor = recordProcessor;
|
||||
|
|
@ -154,7 +209,7 @@ class ProcessTask implements ITask {
|
|||
this.backoffTimeMillis = backoffTimeMillis;
|
||||
this.throttlingReporter = throttlingReporter;
|
||||
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
|
||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||
this.recordsFetcher = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
|
||||
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
|
||||
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
|
||||
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
|
||||
|
|
@ -410,12 +465,7 @@ class ProcessTask implements ITask {
|
|||
* @return list of data records from Kinesis
|
||||
*/
|
||||
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
|
||||
final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords());
|
||||
|
||||
if (getRecordsResult == null) {
|
||||
// Stream no longer exists
|
||||
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
|
||||
}
|
||||
final GetRecordsResult getRecordsResult = recordsFetcher.getNextResult();
|
||||
|
||||
if (getRecordsResult.getMillisBehindLatest() != null) {
|
||||
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
|
||||
|
|
|
|||
|
|
@ -43,6 +43,8 @@ class ShardConsumer {
|
|||
|
||||
private final StreamConfig streamConfig;
|
||||
private final IRecordProcessor recordProcessor;
|
||||
@Getter
|
||||
private final RecordsFetcherFactory recordsFetcherFactory;
|
||||
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
|
||||
private final ExecutorService executorService;
|
||||
private final ShardInfo shardInfo;
|
||||
|
|
@ -133,6 +135,59 @@ class ShardConsumer {
|
|||
Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.streamConfig = streamConfig;
|
||||
this.recordProcessor = recordProcessor;
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords());
|
||||
this.executorService = executorService;
|
||||
this.shardInfo = shardInfo;
|
||||
this.checkpoint = checkpoint;
|
||||
this.recordProcessorCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo,
|
||||
checkpoint,
|
||||
new SequenceNumberValidator(streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
|
||||
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
this.leaseManager = leaseManager;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||
this.taskBackoffTimeMillis = backoffTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo Shard information
|
||||
* @param streamConfig Stream configuration to use
|
||||
* @param checkpoint Checkpoint tracker
|
||||
* @param recordProcessor Record processor used to process the data records for the shard
|
||||
* @param recordsFetcherFactory RecordFetcher factory used to instantiate a recordFetcher object
|
||||
* @param leaseManager Used to create leases for new shards
|
||||
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
|
||||
* @param executorService ExecutorService used to execute process tasks for this shard
|
||||
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
|
||||
* @param backoffTimeMillis backoff interval when we encounter exceptions
|
||||
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
IRecordProcessor recordProcessor,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
long parentShardPollIntervalMillis,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
ExecutorService executorService,
|
||||
IMetricsFactory metricsFactory,
|
||||
long backoffTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.streamConfig = streamConfig;
|
||||
this.recordProcessor = recordProcessor;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.executorService = executorService;
|
||||
this.shardInfo = shardInfo;
|
||||
this.checkpoint = checkpoint;
|
||||
|
|
|
|||
|
|
@ -73,6 +73,7 @@ public class Worker implements Runnable {
|
|||
|
||||
private final String applicationName;
|
||||
private final IRecordProcessorFactory recordProcessorFactory;
|
||||
private final RecordsFetcherFactory recordsFetcherFactory;
|
||||
private final StreamConfig streamConfig;
|
||||
private final InitialPositionInStreamExtended initialPosition;
|
||||
private final ICheckpoint checkpointTracker;
|
||||
|
|
@ -245,6 +246,7 @@ public class Worker implements Runnable {
|
|||
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
|
||||
IMetricsFactory metricsFactory, ExecutorService execService) {
|
||||
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
|
||||
config.getRecordsFetcherFactory(),
|
||||
new StreamConfig(
|
||||
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
|
||||
.getProxy(config.getStreamName()),
|
||||
|
|
@ -391,6 +393,7 @@ public class Worker implements Runnable {
|
|||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(streamConfig.getMaxRecords());
|
||||
this.streamConfig = streamConfig;
|
||||
this.initialPosition = initialPositionInStream;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
|
|
@ -411,6 +414,73 @@ public class Worker implements Runnable {
|
|||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param applicationName
|
||||
* Name of the Kinesis application
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param recordsFetcherFactory
|
||||
* Used to get record fetcher instances for fetching record from shards
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param initialPositionInStream
|
||||
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
|
||||
* this location in the stream when an application starts up for the first time and there are no
|
||||
* checkpoints. If there are checkpoints, we start from the checkpoint position.
|
||||
* @param parentShardPollIntervalMillis
|
||||
* Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIdleTimeMillis
|
||||
* Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupLeasesUponShardCompletion
|
||||
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
||||
* @param checkpoint
|
||||
* Used to get/set checkpoints
|
||||
* @param leaseCoordinator
|
||||
* Lease coordinator (coordinates currently owned leases)
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param taskBackoffTimeMillis
|
||||
* Backoff period when tasks encounter an exception
|
||||
* @param shardPrioritization
|
||||
* Provides prioritization logic to decide which available shards process first
|
||||
* @param retryGetRecordsInSeconds
|
||||
* Time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool
|
||||
* Max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
// NOTE: This has package level access solely for testing
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, RecordsFetcherFactory recordsFetcherFactory, StreamConfig streamConfig,
|
||||
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
||||
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.streamConfig = streamConfig;
|
||||
this.initialPosition = initialPositionInStream;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
|
||||
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
|
||||
this.executorService = execService;
|
||||
this.leaseCoordinator = leaseCoordinator;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
||||
executorService);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.shardPrioritization = shardPrioritization;
|
||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the applicationName
|
||||
|
|
@ -449,7 +519,7 @@ public class Worker implements Runnable {
|
|||
boolean foundCompletedShard = false;
|
||||
Set<ShardInfo> assignedShards = new HashSet<>();
|
||||
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
|
||||
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
|
||||
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory, recordsFetcherFactory);
|
||||
if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) {
|
||||
foundCompletedShard = true;
|
||||
} else {
|
||||
|
|
@ -819,11 +889,13 @@ public class Worker implements Runnable {
|
|||
*
|
||||
* @param shardInfo
|
||||
* Kinesis shard info
|
||||
* @param factory
|
||||
* @param processorFactory
|
||||
* RecordProcessor factory
|
||||
* @param fetcherFactory
|
||||
* RecordFetcher factory
|
||||
* @return ShardConsumer for the shard
|
||||
*/
|
||||
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
|
||||
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory, RecordsFetcherFactory fetcherFactory) {
|
||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
|
||||
// Instantiate a new consumer if we don't have one, or the one we
|
||||
// had was from an earlier
|
||||
|
|
@ -832,17 +904,17 @@ public class Worker implements Runnable {
|
|||
// completely processed (shutdown reason terminate).
|
||||
if ((consumer == null)
|
||||
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
|
||||
consumer = buildConsumer(shardInfo, factory);
|
||||
consumer = buildConsumer(shardInfo, processorFactory, fetcherFactory);
|
||||
shardInfoShardConsumerMap.put(shardInfo, consumer);
|
||||
wlog.infoForce("Created new shardConsumer for : " + shardInfo);
|
||||
}
|
||||
return consumer;
|
||||
}
|
||||
|
||||
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
|
||||
IRecordProcessor recordProcessor = factory.createProcessor();
|
||||
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory, RecordsFetcherFactory fetcherFactory) {
|
||||
IRecordProcessor recordProcessor = processorFactory.createProcessor();
|
||||
|
||||
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
|
||||
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, fetcherFactory,
|
||||
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
|
||||
executorService, metricsFactory, taskBackoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
|
||||
|
|
@ -1049,6 +1121,7 @@ public class Worker implements Runnable {
|
|||
public static class Builder {
|
||||
|
||||
private IRecordProcessorFactory recordProcessorFactory;
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
private KinesisClientLibConfiguration config;
|
||||
private AmazonKinesis kinesisClient;
|
||||
private AmazonDynamoDB dynamoDBClient;
|
||||
|
|
@ -1244,6 +1317,7 @@ public class Worker implements Runnable {
|
|||
|
||||
return new Worker(config.getApplicationName(),
|
||||
recordProcessorFactory,
|
||||
config.getRecordsFetcherFactory(),
|
||||
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
|
||||
kinesisClient).getProxy(config.getStreamName()),
|
||||
config.getMaxRecords(),
|
||||
|
|
|
|||
Loading…
Reference in a new issue