refactor(Worker): throws exception after MAX_RETRIES on runProcessLoop

This commit is contained in:
glarwood 2018-12-12 14:27:38 -08:00
parent 43bbeb5fe6
commit 3a91015bba

View file

@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -83,6 +84,7 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class); private static final Log LOG = LogFactory.getLog(Worker.class);
private static final int MAX_INITIALIZATION_ATTEMPTS = 20; private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final int MAX_RETRIES = 4;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private WorkerLog wlog = new WorkerLog(); private WorkerLog wlog = new WorkerLog();
@ -114,6 +116,7 @@ public class Worker implements Runnable {
private volatile boolean shutdown; private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis; private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false; private volatile boolean shutdownComplete = false;
private AtomicInteger retries = new AtomicInteger();
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer. // info, value is ShardConsumer.
@ -136,13 +139,10 @@ public class Worker implements Runnable {
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -154,15 +154,11 @@ public class Worker implements Runnable {
/** /**
* Constructor. * Constructor.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -178,15 +174,11 @@ public class Worker implements Runnable {
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param metricsFactory
* Metrics factory used to emit metrics
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -196,17 +188,12 @@ public class Worker implements Runnable {
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -220,19 +207,13 @@ public class Worker implements Runnable {
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient CloudWatch Client for publishing metrics
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -243,21 +224,14 @@ public class Worker implements Runnable {
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient CloudWatch Client for publishing metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -269,23 +243,18 @@ public class Worker implements Runnable {
} }
// Backwards compatible constructors // Backwards compatible constructors
/** /**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient CloudWatch Client for publishing metrics
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -300,21 +269,14 @@ public class Worker implements Runnable {
* This constructor is for binary compatibility with code compiled against version of the KCL that only have * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient CloudWatch Client for publishing metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -329,21 +291,14 @@ public class Worker implements Runnable {
* This constructor is for binary compatibility with code compiled against version of the KCL that only have * This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects. * constructors taking "Client" objects.
* *
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory Metrics factory used to emit metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -355,21 +310,14 @@ public class Worker implements Runnable {
} }
/** /**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory Metrics factory used to emit metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create * @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder} * a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/ */
@Deprecated @Deprecated
public Worker( public Worker(
@ -395,8 +343,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(), config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService, execService,
metricsFactory, metricsFactory,
config.getTaskBackoffTimeMillis(), config.getTaskBackoffTimeMillis(),
@ -423,45 +371,32 @@ public class Worker implements Runnable {
} }
/** /**
* @param applicationName * @param applicationName Name of the Kinesis application
* Name of the Kinesis application * @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param recordProcessorFactory * @param streamConfig Stream configuration
* Used to get record processor instances for processing data from shards * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* this location in the stream when an application starts up for the first time and there are no
* checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param checkpoint Used to get/set checkpoints
* @param leaseCoordinator Lease coordinator (coordinates currently owned leases)
* @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory Metrics factory used to emit metrics
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param shardPrioritization Provides prioritization logic to decide which available shards process first
* @paran config * @paran config
* Kinesis Library configuration * Kinesis Library configuration
* @param streamConfig
* Stream configuration
* @param initialPositionInStream
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* this location in the stream when an application starts up for the first time and there are no
* checkpoints. If there are checkpoints, we start from the checkpoint position.
* @param parentShardPollIntervalMillis
* Wait for this long between polls to check if parent shards are done
* @param shardSyncIdleTimeMillis
* Time between tasks to sync leases and Kinesis shards
* @param cleanupLeasesUponShardCompletion
* Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
* Lease coordinator (coordinates currently owned leases)
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory
* Metrics factory used to emit metrics
* @param taskBackoffTimeMillis
* Backoff period when tasks encounter an exception
* @param shardPrioritization
* Provides prioritization logic to decide which available shards process first
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
@ -469,40 +404,24 @@ public class Worker implements Runnable {
} }
/** /**
* @param applicationName * @param applicationName Name of the Kinesis application
* Name of the Kinesis application * @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param recordProcessorFactory * @param config Kinesis Library Configuration
* Used to get record processor instances for processing data from shards * @param streamConfig Stream configuration
* @param config * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from
* Kinesis Library Configuration * this location in the stream when an application starts up for the first time and there are no
* @param streamConfig * checkpoints. If there are checkpoints, we start from the checkpoint position.
* Stream configuration * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param initialPositionInStream * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis)
* this location in the stream when an application starts up for the first time and there are no * @param checkpoint Used to get/set checkpoints
* checkpoints. If there are checkpoints, we start from the checkpoint position. * @param leaseCoordinator Lease coordinator (coordinates currently owned leases)
* @param parentShardPollIntervalMillis * @param execService ExecutorService to use for processing records (support for multi-threaded consumption)
* Wait for this long between polls to check if parent shards are done * @param metricsFactory Metrics factory used to emit metrics
* @param shardSyncIdleTimeMillis * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* Time between tasks to sync leases and Kinesis shards * @param shardPrioritization Provides prioritization logic to decide which available shards process first
* @param cleanupLeasesUponShardCompletion * @param retryGetRecordsInSeconds Time in seconds to wait before the worker retries to get a record.
* Clean up shards we've finished processing (don't wait till they expire in Kinesis) * @param maxGetRecordsThreadPool Max number of threads in the getRecords thread pool.
* @param checkpoint
* Used to get/set checkpoints
* @param leaseCoordinator
* Lease coordinator (coordinates currently owned leases)
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @param metricsFactory
* Metrics factory used to emit metrics
* @param taskBackoffTimeMillis
* Backoff period when tasks encounter an exception
* @param shardPrioritization
* Provides prioritization logic to decide which available shards process first
* @param retryGetRecordsInSeconds
* Time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* Max number of threads in the getRecords thread pool.
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@ -548,7 +467,7 @@ public class Worker implements Runnable {
/** /**
* @return the leaseCoordinator * @return the leaseCoordinator
*/ */
KinesisClientLibLeaseCoordinator getLeaseCoordinator(){ KinesisClientLibLeaseCoordinator getLeaseCoordinator() {
return leaseCoordinator; return leaseCoordinator;
} }
@ -601,6 +520,9 @@ public class Worker implements Runnable {
wlog.info("Sleeping ..."); wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
} catch (Exception e) { } catch (Exception e) {
if (retries.getAndIncrement() > MAX_RETRIES)
throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e);
LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!",
String.valueOf(idleTimeInMilliseconds)), e); String.valueOf(idleTimeInMilliseconds)), e);
try { try {
@ -668,7 +590,7 @@ public class Worker implements Runnable {
/** /**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* * <p>
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo
@ -714,7 +636,7 @@ public class Worker implements Runnable {
/** /**
* Starts the requestedShutdown process, and returns a future that can be used to track the process. * Starts the requestedShutdown process, and returns a future that can be used to track the process.
* * <p>
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
* indicates the process behavior * indicates the process behavior
* *
@ -761,7 +683,7 @@ public class Worker implements Runnable {
* Requests a graceful shutdown of the worker, notifying record processors, that implement * Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint. * checkpoint.
* * <p>
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future. * previous future.
* *
@ -788,8 +710,8 @@ public class Worker implements Runnable {
* </ol> * </ol>
* *
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown * @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
* completed successfully. A false value indicates that a non-exception case caused the shutdown process to * completed successfully. A false value indicates that a non-exception case caused the shutdown process to
* terminate early. * terminate early.
*/ */
public Future<Boolean> startGracefulShutdown() { public Future<Boolean> startGracefulShutdown() {
synchronized (this) { synchronized (this) {
@ -806,9 +728,8 @@ public class Worker implements Runnable {
* shutdowns in your own executor, or execute the shutdown synchronously. * shutdowns in your own executor, or execute the shutdown synchronously.
* *
* @return a callable that run the graceful shutdown process. This may return a callable that return true if the * @return a callable that run the graceful shutdown process. This may return a callable that return true if the
* graceful shutdown has already been completed. * graceful shutdown has already been completed.
* @throws IllegalStateException * @throws IllegalStateException thrown by the callable if another callable has already started the shutdown process.
* thrown by the callable if another callable has already started the shutdown process.
*/ */
public Callable<Boolean> createGracefulShutdownCallable() { public Callable<Boolean> createGracefulShutdownCallable() {
if (isShutdownComplete()) { if (isShutdownComplete()) {
@ -958,10 +879,8 @@ public class Worker implements Runnable {
/** /**
* NOTE: This method is internal/private to the Worker class. It has package access solely for testing. * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
* *
* @param shardInfo * @param shardInfo Kinesis shard info
* Kinesis shard info * @param processorFactory RecordProcessor factory
* @param processorFactory
* RecordProcessor factory
* @return ShardConsumer for the shard * @return ShardConsumer for the shard
*/ */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
@ -1065,14 +984,12 @@ public class Worker implements Runnable {
/** /**
* Given configuration, returns appropriate metrics factory. * Given configuration, returns appropriate metrics factory.
* *
* @param cloudWatchClient * @param cloudWatchClient Amazon CloudWatch client
* Amazon CloudWatch client * @param config KinesisClientLibConfiguration
* @param config
* KinesisClientLibConfiguration
* @return Returns metrics factory based on the config. * @return Returns metrics factory based on the config.
*/ */
private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
IMetricsFactory metricsFactory; IMetricsFactory metricsFactory;
if (config.getMetricsLevel() == MetricsLevel.NONE) { if (config.getMetricsLevel() == MetricsLevel.NONE) {
metricsFactory = new NullMetricsFactory(); metricsFactory = new NullMetricsFactory();
@ -1113,7 +1030,7 @@ public class Worker implements Runnable {
static class WorkerCWMetricsFactory extends CWMetricsFactory { static class WorkerCWMetricsFactory extends CWMetricsFactory {
WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis, WorkerCWMetricsFactory(AmazonCloudWatch cloudWatchClient, String namespace, long bufferTimeMillis,
int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) { int maxQueueSize, MetricsLevel metricsLevel, Set<String> metricsEnabledDimensions) {
super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions); super(cloudWatchClient, namespace, bufferTimeMillis, maxQueueSize, metricsLevel, metricsEnabledDimensions);
} }
} }
@ -1138,25 +1055,35 @@ public class Worker implements Runnable {
public static class Builder { public static class Builder {
private IRecordProcessorFactory recordProcessorFactory; private IRecordProcessorFactory recordProcessorFactory;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private KinesisClientLibConfiguration config; private KinesisClientLibConfiguration config;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private AmazonKinesis kinesisClient; private AmazonKinesis kinesisClient;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private AmazonDynamoDB dynamoDBClient; private AmazonDynamoDB dynamoDBClient;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private AmazonCloudWatch cloudWatchClient; private AmazonCloudWatch cloudWatchClient;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private IMetricsFactory metricsFactory; private IMetricsFactory metricsFactory;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private ILeaseManager<KinesisClientLease> leaseManager; private ILeaseManager<KinesisClientLease> leaseManager;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private ExecutorService execService; private ExecutorService execService;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private IKinesisProxy kinesisProxy; private IKinesisProxy kinesisProxy;
@Setter @Accessors(fluent = true) @Setter
@Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener; private WorkerStateChangeListener workerStateChangeListener;
@VisibleForTesting @VisibleForTesting
@ -1178,8 +1105,7 @@ public class Worker implements Runnable {
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory * @param recordProcessorFactory Used to get record processor instances for processing data from shards
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory( public Builder recordProcessorFactory(
@ -1192,8 +1118,7 @@ public class Worker implements Runnable {
* Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor * Provide a V2 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
* IRecordProcessor}. * IRecordProcessor}.
* *
* @param recordProcessorFactory * @param recordProcessorFactory Used to get record processor instances for processing data from shards
* Used to get record processor instances for processing data from shards
* @return A reference to this updated object so that method calls can be chained together. * @return A reference to this updated object so that method calls can be chained together.
*/ */
public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) { public Builder recordProcessorFactory(IRecordProcessorFactory recordProcessorFactory) {
@ -1294,8 +1219,8 @@ public class Worker implements Runnable {
config.getMaxLeasesToStealAtOneTime(), config.getMaxLeasesToStealAtOneTime(),
config.getMaxLeaseRenewalThreads(), config.getMaxLeaseRenewalThreads(),
metricsFactory) metricsFactory)
.withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
.withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
execService, execService,
metricsFactory, metricsFactory,
config.getTaskBackoffTimeMillis(), config.getTaskBackoffTimeMillis(),