Integrated GetTimeout Config and GetRecordsStrategy inside ShardConsumer

This commit is contained in:
Wei 2017-09-05 15:44:24 -07:00
parent 9ba7f219c2
commit 61be5baeb0
4 changed files with 25 additions and 53 deletions

View file

@ -309,15 +309,10 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
if(consumer.getMaxGetRecordsThreadPool().isPresent() && consumer.getRetryGetRecordsInSeconds().isPresent()) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getRetryGetRecordsInSeconds().get(), consumer.getMaxGetRecordsThreadPool().get());
}
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist());
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool());
}
@Override

View file

@ -65,6 +65,17 @@ class ProcessTask implements ITask {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/**
* @param shardInfo
* contains information about the shard
@ -80,11 +91,10 @@ class ProcessTask implements ITask {
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
@ -108,11 +118,11 @@ class ProcessTask implements ITask {
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
int retryGetRecordsInSeconds, int maxGetRecordsThreadPool) {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo.getShardId()));
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
}
/**

View file

@ -99,26 +99,9 @@ class ShardConsumer {
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor;
this.executorService = executorService;
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer =
new RecordProcessorCheckpointer(shardInfo,
checkpoint,
new SequenceNumberValidator(streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.retryGetRecordsInSeconds = Optional.empty();
this.maxGetRecordsThreadPool = Optional.empty();
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**

View file

@ -339,26 +339,10 @@ public class Worker implements Runnable {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.streamConfig = streamConfig;
this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
this.executorService = execService;
this.leaseCoordinator = leaseCoordinator;
this.metricsFactory = metricsFactory;
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = Optional.empty();
this.maxGetRecordsThreadPool = Optional.empty();
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
}