diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 15076005..c9576b76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; @@ -83,6 +84,7 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); private static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private static final int MAX_RETRIES = 4; private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private WorkerLog wlog = new WorkerLog(); @@ -114,6 +116,7 @@ public class Worker implements Runnable { private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; private volatile boolean shutdownComplete = false; + private AtomicInteger retries = new AtomicInteger(); // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -136,13 +139,10 @@ public class Worker implements Runnable { /** * Constructor. * + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration */ @Deprecated public Worker( @@ -154,15 +154,11 @@ public class Worker implements Runnable { /** * Constructor. * + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -178,15 +174,11 @@ public class Worker implements Runnable { } /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param metricsFactory Metrics factory used to emit metrics * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param metricsFactory - * Metrics factory used to emit metrics */ @Deprecated public Worker( @@ -196,17 +188,12 @@ public class Worker implements Runnable { } /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param metricsFactory Metrics factory used to emit metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param metricsFactory - * Metrics factory used to emit metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -220,19 +207,13 @@ public class Worker implements Runnable { } /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient CloudWatch Client for publishing metrics * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics */ @Deprecated public Worker( @@ -243,21 +224,14 @@ public class Worker implements Runnable { } /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -269,23 +243,18 @@ public class Worker implements Runnable { } // Backwards compatible constructors + /** * This constructor is for binary compatibility with code compiled against version of the KCL that only have * constructors taking "Client" objects. * + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient CloudWatch Client for publishing metrics * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics */ @Deprecated public Worker( @@ -300,21 +269,14 @@ public class Worker implements Runnable { * This constructor is for binary compatibility with code compiled against version of the KCL that only have * constructors taking "Client" objects. * + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient CloudWatch Client for publishing metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -329,21 +291,14 @@ public class Worker implements Runnable { * This constructor is for binary compatibility with code compiled against version of the KCL that only have * constructors taking "Client" objects. * + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory Metrics factory used to emit metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory - * Metrics factory used to emit metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -355,21 +310,14 @@ public class Worker implements Runnable { } /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory Metrics factory used to emit metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) * @deprecated The access to this constructor will be changed in a future release. The recommended way to create * a Worker is to use {@link Builder} - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory - * Metrics factory used to emit metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) */ @Deprecated public Worker( @@ -395,8 +343,8 @@ public class Worker implements Runnable { config.getMaxLeasesToStealAtOneTime(), config.getMaxLeaseRenewalThreads(), metricsFactory) - .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) - .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), + .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity()) + .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()), execService, metricsFactory, config.getTaskBackoffTimeMillis(), @@ -423,45 +371,32 @@ public class Worker implements Runnable { } /** - * @param applicationName - * Name of the Kinesis application - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards + * @param applicationName Name of the Kinesis application + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param streamConfig Stream configuration + * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint Used to get/set checkpoints + * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory Metrics factory used to emit metrics + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param shardPrioritization Provides prioritization logic to decide which available shards process first * @paran config - * Kinesis Library configuration - * @param streamConfig - * Stream configuration - * @param initialPositionInStream - * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from - * this location in the stream when an application starts up for the first time and there are no - * checkpoints. If there are checkpoints, we start from the checkpoint position. - * @param parentShardPollIntervalMillis - * Wait for this long between polls to check if parent shards are done - * @param shardSyncIdleTimeMillis - * Time between tasks to sync leases and Kinesis shards - * @param cleanupLeasesUponShardCompletion - * Clean up shards we've finished processing (don't wait till they expire in Kinesis) - * @param checkpoint - * Used to get/set checkpoints - * @param leaseCoordinator - * Lease coordinator (coordinates currently owned leases) - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) - * @param metricsFactory - * Metrics factory used to emit metrics - * @param taskBackoffTimeMillis - * Backoff period when tasks encounter an exception - * @param shardPrioritization - * Provides prioritization logic to decide which available shards process first + * Kinesis Library configuration */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, - long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, - KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, - IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { + long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, + KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, + IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, @@ -469,40 +404,24 @@ public class Worker implements Runnable { } /** - * @param applicationName - * Name of the Kinesis application - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Library Configuration - * @param streamConfig - * Stream configuration - * @param initialPositionInStream - * One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from - * this location in the stream when an application starts up for the first time and there are no - * checkpoints. If there are checkpoints, we start from the checkpoint position. - * @param parentShardPollIntervalMillis - * Wait for this long between polls to check if parent shards are done - * @param shardSyncIdleTimeMillis - * Time between tasks to sync leases and Kinesis shards - * @param cleanupLeasesUponShardCompletion - * Clean up shards we've finished processing (don't wait till they expire in Kinesis) - * @param checkpoint - * Used to get/set checkpoints - * @param leaseCoordinator - * Lease coordinator (coordinates currently owned leases) - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) - * @param metricsFactory - * Metrics factory used to emit metrics - * @param taskBackoffTimeMillis - * Backoff period when tasks encounter an exception - * @param shardPrioritization - * Provides prioritization logic to decide which available shards process first - * @param retryGetRecordsInSeconds - * Time in seconds to wait before the worker retries to get a record. - * @param maxGetRecordsThreadPool - * Max number of threads in the getRecords thread pool. + * @param applicationName Name of the Kinesis application + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Library Configuration + * @param streamConfig Stream configuration + * @param initialPositionInStream One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. The KinesisClientLibrary will start fetching data from + * this location in the stream when an application starts up for the first time and there are no + * checkpoints. If there are checkpoints, we start from the checkpoint position. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupLeasesUponShardCompletion Clean up shards we've finished processing (don't wait till they expire in Kinesis) + * @param checkpoint Used to get/set checkpoints + * @param leaseCoordinator Lease coordinator (coordinates currently owned leases) + * @param execService ExecutorService to use for processing records (support for multi-threaded consumption) + * @param metricsFactory Metrics factory used to emit metrics + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param shardPrioritization Provides prioritization logic to decide which available shards process first + * @param retryGetRecordsInSeconds Time in seconds to wait before the worker retries to get a record. + * @param maxGetRecordsThreadPool Max number of threads in the getRecords thread pool. */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @@ -548,7 +467,7 @@ public class Worker implements Runnable { /** * @return the leaseCoordinator */ - KinesisClientLibLeaseCoordinator getLeaseCoordinator(){ + KinesisClientLibLeaseCoordinator getLeaseCoordinator() { return leaseCoordinator; } @@ -601,6 +520,9 @@ public class Worker implements Runnable { wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { + if (retries.getAndIncrement() > MAX_RETRIES) + throw new RuntimeException("Failing after " + MAX_RETRIES + " attempts", e); + LOG.error(String.format("Worker.run caught exception, sleeping for %s milli seconds!", String.valueOf(idleTimeInMilliseconds)), e); try { @@ -668,7 +590,7 @@ public class Worker implements Runnable { /** * NOTE: This method is internal/private to the Worker class. It has package access solely for testing. - * + *
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. ShardInfo @@ -714,7 +636,7 @@ public class Worker implements Runnable { /** * Starts the requestedShutdown process, and returns a future that can be used to track the process. - * + *
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and * indicates the process behavior * @@ -761,7 +683,7 @@ public class Worker implements Runnable { * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * + *
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
*
@@ -788,8 +710,8 @@ public class Worker implements Runnable {
*
*
* @return a future that will be set once the shutdown has completed. True indicates that the graceful shutdown
- * completed successfully. A false value indicates that a non-exception case caused the shutdown process to
- * terminate early.
+ * completed successfully. A false value indicates that a non-exception case caused the shutdown process to
+ * terminate early.
*/
public Future