integrated timeout configuration with shardconsumer
This commit is contained in:
parent
150c69169f
commit
2c72207bfb
6 changed files with 213 additions and 5 deletions
|
|
@ -14,6 +14,8 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
|
||||
* and state transitions is contained within the {@link ConsumerState} objects.
|
||||
|
|
@ -307,6 +309,12 @@ class ConsumerStates {
|
|||
|
||||
@Override
|
||||
public ITask createTask(ShardConsumer consumer) {
|
||||
if(consumer.getMaxGetRecordsThreadPool().isPresent() && consumer.getRetryGetRecordsInSeconds().isPresent()) {
|
||||
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
|
||||
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
|
||||
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
consumer.getRetryGetRecordsInSeconds().get(), consumer.getMaxGetRecordsThreadPool().get());
|
||||
}
|
||||
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
|
||||
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
|
||||
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist());
|
||||
|
|
|
|||
|
|
@ -1119,7 +1119,7 @@ public class KinesisClientLibConfiguration {
|
|||
|
||||
|
||||
/**
|
||||
* @param retryGetRecordsInSeconds the time in secods to wait before the worker retries to get a record.
|
||||
* @param retryGetRecordsInSeconds the time in seconds to wait before the worker retries to get a record.
|
||||
* @return this configuration object.
|
||||
*/
|
||||
public KinesisClientLibConfiguration withRetryGetRecordsInSeconds(final int retryGetRecordsInSeconds) {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import java.math.BigInteger;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
|
@ -86,6 +87,34 @@ class ProcessTask implements ITask {
|
|||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new SynchronousGetRecordsRetrivalStrategy(dataFetcher));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo
|
||||
* contains information about the shard
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param recordProcessor
|
||||
* Record processor used to process the data records for the shard
|
||||
* @param recordProcessorCheckpointer
|
||||
* Passed to the RecordProcessor so it can checkpoint progress
|
||||
* @param dataFetcher
|
||||
* Kinesis data fetcher (used to fetch records from Kinesis)
|
||||
* @param backoffTimeMillis
|
||||
* backoff time when catching exceptions
|
||||
* @param retryGetRecordsInSeconds
|
||||
* time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool
|
||||
* max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
|
||||
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
int retryGetRecordsInSeconds, int maxGetRecordsThreadPool) {
|
||||
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
|
||||
new AsynchronousGetRecordsRetrivalStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo
|
||||
* contains information about the shard
|
||||
|
|
|
|||
|
|
@ -15,10 +15,12 @@
|
|||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
|
@ -53,6 +55,10 @@ class ShardConsumer {
|
|||
private final boolean cleanupLeasesOfCompletedShards;
|
||||
private final long taskBackoffTimeMillis;
|
||||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
@Getter
|
||||
private final Optional<Integer> retryGetRecordsInSeconds;
|
||||
@Getter
|
||||
private final Optional<Integer> maxGetRecordsThreadPool;
|
||||
|
||||
private ITask currentTask;
|
||||
private long currentTaskSubmitTime;
|
||||
|
|
@ -111,6 +117,57 @@ class ShardConsumer {
|
|||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||
this.taskBackoffTimeMillis = backoffTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.retryGetRecordsInSeconds = Optional.empty();
|
||||
this.maxGetRecordsThreadPool = Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo Shard information
|
||||
* @param streamConfig Stream configuration to use
|
||||
* @param checkpoint Checkpoint tracker
|
||||
* @param recordProcessor Record processor used to process the data records for the shard
|
||||
* @param leaseManager Used to create leases for new shards
|
||||
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
|
||||
* @param executorService ExecutorService used to execute process tasks for this shard
|
||||
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
|
||||
* @param backoffTimeMillis backoff interval when we encounter exceptions
|
||||
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
IRecordProcessor recordProcessor,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
long parentShardPollIntervalMillis,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
ExecutorService executorService,
|
||||
IMetricsFactory metricsFactory,
|
||||
long backoffTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.streamConfig = streamConfig;
|
||||
this.recordProcessor = recordProcessor;
|
||||
this.executorService = executorService;
|
||||
this.shardInfo = shardInfo;
|
||||
this.checkpoint = checkpoint;
|
||||
this.recordProcessorCheckpointer =
|
||||
new RecordProcessorCheckpointer(shardInfo,
|
||||
checkpoint,
|
||||
new SequenceNumberValidator(streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()));
|
||||
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
this.leaseManager = leaseManager;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
|
||||
this.taskBackoffTimeMillis = backoffTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
|
@ -85,6 +86,9 @@ public class Worker implements Runnable {
|
|||
private final long taskBackoffTimeMillis;
|
||||
private final long failoverTimeMillis;
|
||||
|
||||
private final Optional<Integer> retryGetRecordsInSeconds;
|
||||
private final Optional<Integer> maxGetRecordsThreadPool;
|
||||
|
||||
// private final KinesisClientLeaseManager leaseManager;
|
||||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
private final ShardSyncTaskManager controlServer;
|
||||
|
|
@ -266,7 +270,9 @@ public class Worker implements Runnable {
|
|||
config.getTaskBackoffTimeMillis(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
config.getShardPrioritizationStrategy());
|
||||
config.getShardPrioritizationStrategy(),
|
||||
config.getRetryGetRecordsInSeconds(),
|
||||
config.getMaxGetRecordsThreadPool());
|
||||
|
||||
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
|
||||
if (config.getRegionName() != null) {
|
||||
|
|
@ -351,8 +357,77 @@ public class Worker implements Runnable {
|
|||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.shardPrioritization = shardPrioritization;
|
||||
this.retryGetRecordsInSeconds = Optional.empty();
|
||||
this.maxGetRecordsThreadPool = Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param applicationName
|
||||
* Name of the Kinesis application
|
||||
* @param recordProcessorFactory
|
||||
* Used to get record processor instances for processing data from shards
|
||||
* @param streamConfig
|
||||
* Stream configuration
|
||||
* @param initialPositionInStream
|
||||
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
|
||||
* this location in the stream when an application starts up for the first time and there are no
|
||||
* checkpoints. If there are checkpoints, we start from the checkpoint position.
|
||||
* @param parentShardPollIntervalMillis
|
||||
* Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIdleTimeMillis
|
||||
* Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupLeasesUponShardCompletion
|
||||
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
|
||||
* @param checkpoint
|
||||
* Used to get/set checkpoints
|
||||
* @param leaseCoordinator
|
||||
* Lease coordinator (coordinates currently owned leases)
|
||||
* @param execService
|
||||
* ExecutorService to use for processing records (support for multi-threaded consumption)
|
||||
* @param metricsFactory
|
||||
* Metrics factory used to emit metrics
|
||||
* @param taskBackoffTimeMillis
|
||||
* Backoff period when tasks encounter an exception
|
||||
* @param shardPrioritization
|
||||
* Provides prioritization logic to decide which available shards process first
|
||||
* @param retryGetRecordsInSeconds
|
||||
* Time in seconds to wait before the worker retries to get a record.
|
||||
* @param maxGetRecordsThreadPool
|
||||
* Max number of threads in the getRecords thread pool.
|
||||
*/
|
||||
// NOTE: This has package level access solely for testing
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig,
|
||||
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
|
||||
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
|
||||
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
|
||||
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
|
||||
this.applicationName = applicationName;
|
||||
this.recordProcessorFactory = recordProcessorFactory;
|
||||
this.streamConfig = streamConfig;
|
||||
this.initialPosition = initialPositionInStream;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
|
||||
this.checkpointTracker = checkpoint != null ? checkpoint : leaseCoordinator;
|
||||
this.idleTimeInMilliseconds = streamConfig.getIdleTimeInMilliseconds();
|
||||
this.executorService = execService;
|
||||
this.leaseCoordinator = leaseCoordinator;
|
||||
this.metricsFactory = metricsFactory;
|
||||
this.controlServer = new ShardSyncTaskManager(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
initialPositionInStream, cleanupLeasesUponShardCompletion, shardSyncIdleTimeMillis, metricsFactory,
|
||||
executorService);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
|
||||
this.shardPrioritization = shardPrioritization;
|
||||
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
|
||||
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the applicationName
|
||||
*/
|
||||
|
|
@ -786,7 +861,7 @@ public class Worker implements Runnable {
|
|||
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
|
||||
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
|
||||
executorService, metricsFactory, taskBackoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist);
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool);
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1213,7 +1288,9 @@ public class Worker implements Runnable {
|
|||
config.getTaskBackoffTimeMillis(),
|
||||
config.getFailoverTimeMillis(),
|
||||
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
|
||||
shardPrioritization);
|
||||
shardPrioritization,
|
||||
config.getRetryGetRecordsInSeconds(),
|
||||
config.getMaxGetRecordsThreadPool());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
|
||||
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
|
@ -25,6 +26,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
|
|
@ -152,7 +154,10 @@ public class ConsumerStatesTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void processingStateTest() {
|
||||
public void processingStateTestSynchronous() {
|
||||
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty());
|
||||
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty());
|
||||
|
||||
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
|
||||
ITask task = state.createTask(consumer);
|
||||
|
||||
|
|
@ -163,6 +168,38 @@ public class ConsumerStatesTest {
|
|||
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
|
||||
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
|
||||
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
|
||||
assertThat(task, procTask(GetRecordsRetrivalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(SynchronousGetRecordsRetrivalStrategy.class) ));
|
||||
|
||||
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
|
||||
|
||||
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
|
||||
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
|
||||
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
|
||||
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
|
||||
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
|
||||
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
|
||||
|
||||
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
|
||||
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processingStateTestAsynchronous() {
|
||||
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
|
||||
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
|
||||
|
||||
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
|
||||
ITask task = state.createTask(consumer);
|
||||
|
||||
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
|
||||
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
|
||||
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
|
||||
equalTo(recordProcessorCheckpointer)));
|
||||
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
|
||||
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
|
||||
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
|
||||
assertThat(task, procTask(GetRecordsRetrivalStrategy.class, "getRecordsRetrivalStrategy", instanceOf(AsynchronousGetRecordsRetrivalStrategy.class) ));
|
||||
|
||||
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue