From 2c72207bfb4d0327e02030cee213cd0ba772aeb5 Mon Sep 17 00:00:00 2001 From: Wei Date: Thu, 31 Aug 2017 16:25:44 -0700 Subject: [PATCH] integrated timeout configuration with shardconsumer --- .../lib/worker/ConsumerStates.java | 8 ++ .../worker/KinesisClientLibConfiguration.java | 2 +- .../clientlibrary/lib/worker/ProcessTask.java | 29 +++++++ .../lib/worker/ShardConsumer.java | 57 +++++++++++++ .../clientlibrary/lib/worker/Worker.java | 83 ++++++++++++++++++- .../lib/worker/ConsumerStatesTest.java | 39 ++++++++- 6 files changed, 213 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d967b2c3..6a135202 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Optional; + /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. @@ -307,6 +309,12 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { + if(consumer.getMaxGetRecordsThreadPool().isPresent() && consumer.getRetryGetRecordsInSeconds().isPresent()) { + return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getRetryGetRecordsInSeconds().get(), consumer.getMaxGetRecordsThreadPool().get()); + } return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 6da407d2..62d87f30 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1119,7 +1119,7 @@ public class KinesisClientLibConfiguration { /** - * @param retryGetRecordsInSeconds the time in secods to wait before the worker retries to get a record. + * @param retryGetRecordsInSeconds the time in seconds to wait before the worker retries to get a record. * @return this configuration object. */ public KinesisClientLibConfiguration withRetryGetRecordsInSeconds(final int retryGetRecordsInSeconds) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 42e36afa..af8306ba 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -18,6 +18,7 @@ import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.ListIterator; +import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,6 +87,34 @@ class ProcessTask implements ITask { new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new SynchronousGetRecordsRetrivalStrategy(dataFetcher)); } + /** + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress + * @param dataFetcher + * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions + * @param retryGetRecordsInSeconds + * time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool + * max number of threads in the getRecords thread pool. + */ + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + int retryGetRecordsInSeconds, int maxGetRecordsThreadPool) { + this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), + new AsynchronousGetRecordsRetrivalStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool)); + } + /** * @param shardInfo * contains information about the shard diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 63cce40d..5fc41a3a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -15,10 +15,12 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import lombok.Getter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,6 +55,10 @@ class ShardConsumer { private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; + @Getter + private final Optional retryGetRecordsInSeconds; + @Getter + private final Optional maxGetRecordsThreadPool; private ITask currentTask; private long currentTaskSubmitTime; @@ -111,6 +117,57 @@ class ShardConsumer { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; + this.retryGetRecordsInSeconds = Optional.empty(); + this.maxGetRecordsThreadPool = Optional.empty(); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream configuration to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param leaseManager Used to create leases for new shards + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. + */ + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool) { + this.streamConfig = streamConfig; + this.recordProcessor = recordProcessor; + this.executorService = executorService; + this.shardInfo = shardInfo; + this.checkpoint = checkpoint; + this.recordProcessorCheckpointer = + new RecordProcessorCheckpointer(shardInfo, + checkpoint, + new SequenceNumberValidator(streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); + this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + this.leaseManager = leaseManager; + this.metricsFactory = metricsFactory; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; + this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; + this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index fd461e31..0b19896d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -85,6 +86,9 @@ public class Worker implements Runnable { private final long taskBackoffTimeMillis; private final long failoverTimeMillis; + private final Optional retryGetRecordsInSeconds; + private final Optional maxGetRecordsThreadPool; + // private final KinesisClientLeaseManager leaseManager; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; @@ -266,7 +270,9 @@ public class Worker implements Runnable { config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), - config.getShardPrioritizationStrategy()); + config.getShardPrioritizationStrategy(), + config.getRetryGetRecordsInSeconds(), + config.getMaxGetRecordsThreadPool()); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -351,8 +357,77 @@ public class Worker implements Runnable { this.failoverTimeMillis = failoverTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; + this.retryGetRecordsInSeconds = Optional.empty(); + this.maxGetRecordsThreadPool = Optional.empty(); } + + /** + * @param applicationName + * Name of the Kinesis application + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param streamConfig + * Stream configuration + * @param initialPositionInStream + * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis + * Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis + * Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion + * Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint + * Used to get/set checkpoints + * @param leaseCoordinator + * Lease coordinator (coordinates currently owned leases) + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory + * Metrics factory used to emit metrics + * @param taskBackoffTimeMillis + * Backoff period when tasks encounter an exception + * @param shardPrioritization + * Provides prioritization logic to decide which available shards process first + * @param retryGetRecordsInSeconds + * Time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool + * Max number of threads in the getRecords thread pool. + */ + // NOTE: This has package level access solely for testing + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { + this.applicationName = applicationName; + this.recordProcessorFactory = recordProcessorFactory; + this.streamConfig = streamConfig; + this.initialPosition = initialPositionInStream; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; + this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator; + this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds(); + this.executorService = execService; + this.leaseCoordinator = leaseCoordinator; + this.metricsFactory = metricsFactory; + this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory, + executorService); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; + this.shardPrioritization = shardPrioritization; + this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; + this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + } + + /** * @return the applicationName */ @@ -786,7 +861,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, executorService, metricsFactory, taskBackoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist); + skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); } @@ -1213,7 +1288,9 @@ public class Worker implements Runnable { config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), - shardPrioritization); + shardPrioritization, + config.getRetryGetRecordsInSeconds(), + config.getMaxGetRecordsThreadPool()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 31272379..cac3d35e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.never; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.lang.reflect.Field; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -152,7 +154,10 @@ public class ConsumerStatesTest { } @Test - public void processingStateTest() { + public void processingStateTestSynchronous() { + when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty()); + when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty()); + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ITask task = state.createTask(consumer); @@ -163,6 +168,38 @@ public class ConsumerStatesTest { assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, procTask(GetRecordsRetrivalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(SynchronousGetRecordsRetrivalStrategy.class) )); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + + } + + @Test + public void processingStateTestAsynchronous() { + when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1)); + when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2)); + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, procTask(GetRecordsRetrivalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(AsynchronousGetRecordsRetrivalStrategy.class) )); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));