From 61be5baeb0656dfc154b88a4a1f57e72a6080ce0 Mon Sep 17 00:00:00 2001 From: Wei Date: Tue, 5 Sep 2017 15:44:24 -0700 Subject: [PATCH] Integrated GetTimeout Config and GetRecordsStrategy inside ShardConsumer --- .../lib/worker/ConsumerStates.java | 9 ++----- .../clientlibrary/lib/worker/ProcessTask.java | 22 ++++++++++++----- .../lib/worker/ShardConsumer.java | 23 +++--------------- .../clientlibrary/lib/worker/Worker.java | 24 ++++--------------- 4 files changed, 25 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 6a135202..f6d96b4d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -309,15 +309,10 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - if(consumer.getMaxGetRecordsThreadPool().isPresent() && consumer.getRetryGetRecordsInSeconds().isPresent()) { - return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getRetryGetRecordsInSeconds().get(), consumer.getMaxGetRecordsThreadPool().get()); - } return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist()); + consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 8c758e69..223236f6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -65,6 +65,17 @@ class ProcessTask implements ITask { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + ShardInfo shardInfo) { + Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> + maxGetRecordsThreadPool.map(max -> + new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); + + return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + } + /** * @param shardInfo * contains information about the shard @@ -80,11 +91,10 @@ class ProcessTask implements ITask { * backoff time when catching exceptions */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, - new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** @@ -108,11 +118,11 @@ class ProcessTask implements ITask { public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - int retryGetRecordsInSeconds, int maxGetRecordsThreadPool) { + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo.getShardId())); + makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo)); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 5fc41a3a..70a81fbc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -99,26 +99,9 @@ class ShardConsumer { IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this.streamConfig = streamConfig; - this.recordProcessor = recordProcessor; - this.executorService = executorService; - this.shardInfo = shardInfo; - this.checkpoint = checkpoint; - this.recordProcessorCheckpointer = - new RecordProcessorCheckpointer(shardInfo, - checkpoint, - new SequenceNumberValidator(streamConfig.getStreamProxy(), - shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); - this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); - this.leaseManager = leaseManager; - this.metricsFactory = metricsFactory; - this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; - this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; - this.taskBackoffTimeMillis = backoffTimeMillis; - this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.retryGetRecordsInSeconds = Optional.empty(); - this.maxGetRecordsThreadPool = Optional.empty(); + this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 0b19896d..3cfb9f2f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -339,26 +339,10 @@ public class Worker implements Runnable { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this.applicationName = applicationName; - this.recordProcessorFactory = recordProcessorFactory; - this.streamConfig = streamConfig; - this.initialPosition = initialPositionInStream; - this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; - this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; - this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator; - this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds(); - this.executorService = execService; - this.leaseCoordinator = leaseCoordinator; - this.metricsFactory = metricsFactory; - this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), - initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, - executorService); - this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; - this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.shardPrioritization = shardPrioritization; - this.retryGetRecordsInSeconds = Optional.empty(); - this.maxGetRecordsThreadPool = Optional.empty(); + this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, + metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization, Optional.empty(), Optional.empty()); }