From a53473d5365e8169fecc1a5d72b5954d4fa88640 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 5 Mar 2018 10:25:41 -0800 Subject: [PATCH] Refactoring the Worker to make it manageable (#300) * Reformatting the constructors to be together in the Worker class. * Using lombok annotations to replace setters in the Worker.Builder. * Deprecating all the public constructors for the Worker class. Recommending to use the Worker.Builder to create the Worker object. * Including annotation usage for leaseManager in the Worker.Builder * Adding capability to support Immutable clients by the Worker and Worker.Builder * Adding annotation support for WorkerStateChangeListener. --- .../clientlibrary/lib/worker/Worker.java | 335 +++++++----------- 1 file changed, 130 insertions(+), 205 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 644f4225..4a03b449 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,9 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,10 +47,12 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; @@ -61,6 +60,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Setter; +import lombok.experimental.Accessors; + /** * Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees * different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from @@ -124,11 +126,15 @@ public class Worker implements Runnable { /** * Constructor. * + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config * Kinesis Client Library configuration */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config) { @@ -138,6 +144,9 @@ public class Worker implements Runnable { /** * Constructor. * + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -145,6 +154,7 @@ public class Worker implements Runnable { * @param execService * ExecutorService to use for processing records (support for multi-threaded consumption) */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, ExecutorService execService) { @@ -158,6 +168,9 @@ public class Worker implements Runnable { } /** + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -165,6 +178,7 @@ public class Worker implements Runnable { * @param metricsFactory * Metrics factory used to emit metrics */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) { @@ -172,6 +186,9 @@ public class Worker implements Runnable { } /** + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -181,6 +198,7 @@ public class Worker implements Runnable { * @param execService * ExecutorService to use for processing records (support for multi-threaded consumption) */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) { @@ -192,6 +210,9 @@ public class Worker implements Runnable { } /** + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -203,6 +224,7 @@ public class Worker implements Runnable { * @param cloudWatchClient * CloudWatch Client for publishing metrics */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, @@ -211,6 +233,9 @@ public class Worker implements Runnable { } /** + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -224,6 +249,7 @@ public class Worker implements Runnable { * @param execService * ExecutorService to use for processing records (support for multi-threaded consumption) */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, @@ -232,7 +258,70 @@ public class Worker implements Runnable { execService); } + // Backwards compatible constructors /** + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. + * + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + */ + @Deprecated + public Worker( + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + (AmazonCloudWatch) cloudWatchClient); + } + + /** + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. + * + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param cloudWatchClient + * CloudWatch Client for publishing metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + */ + @Deprecated + public Worker( + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + (AmazonCloudWatch) cloudWatchClient, execService); + } + + /** + * This constructor is for binary compatibility with code compiled against version of the KCL that only have + * constructors taking "Client" objects. + * + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * * @param recordProcessorFactory * Used to get record processor instances for processing data from shards * @param config @@ -246,6 +335,33 @@ public class Worker implements Runnable { * @param execService * ExecutorService to use for processing records (support for multi-threaded consumption) */ + @Deprecated + public Worker( + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, + KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, + AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { + this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, + metricsFactory, execService); + } + + /** + * @deprecated The access to this constructor will be changed in a future release. The recommended way to create + * a Worker is to use {@link Builder} + * + * @param recordProcessorFactory + * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Client Library configuration + * @param kinesisClient + * Kinesis Client used for fetching data + * @param dynamoDBClient + * DynamoDB client used for checkpoints and tracking leases + * @param metricsFactory + * Metrics factory used to emit metrics + * @param execService + * ExecutorService to use for processing records (support for multi-threaded consumption) + */ + @Deprecated public Worker( com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, @@ -943,80 +1059,6 @@ public class Worker implements Runnable { } } - // Backwards compatible constructors - /** - * This constructor is for binary compatibility with code compiled against version of the KCL that only have - * constructors taking "Client" objects. - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics - */ - public Worker( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { - this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, - (AmazonCloudWatch) cloudWatchClient); - } - - /** - * This constructor is for binary compatibility with code compiled against version of the KCL that only have - * constructors taking "Client" objects. - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param cloudWatchClient - * CloudWatch Client for publishing metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) - */ - public Worker( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { - this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, - (AmazonCloudWatch) cloudWatchClient, execService); - } - - /** - * This constructor is for binary compatibility with code compiled against version of the KCL that only have - * constructors taking "Client" objects. - * - * @param recordProcessorFactory - * Used to get record processor instances for processing data from shards - * @param config - * Kinesis Client Library configuration - * @param kinesisClient - * Kinesis Client used for fetching data - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @param metricsFactory - * Metrics factory used to emit metrics - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) - */ - public Worker( - com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, - KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient, - AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { - this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient, - metricsFactory, execService); - } - @VisibleForTesting StreamConfig getStreamConfig() { return streamConfig; @@ -1091,24 +1133,27 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; - private RecordsFetcherFactory recordsFetcherFactory; + @Setter @Accessors(fluent = true) private KinesisClientLibConfiguration config; + @Setter @Accessors(fluent = true) private AmazonKinesis kinesisClient; + @Setter @Accessors(fluent = true) private AmazonDynamoDB dynamoDBClient; + @Setter @Accessors(fluent = true) private AmazonCloudWatch cloudWatchClient; + @Setter @Accessors(fluent = true) private IMetricsFactory metricsFactory; + @Setter @Accessors(fluent = true) private ILeaseManager leaseManager; + @Setter @Accessors(fluent = true) private ExecutorService execService; + @Setter @Accessors(fluent = true) private ShardPrioritization shardPrioritization; + @Setter @Accessors(fluent = true) private IKinesisProxy kinesisProxy; + @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; - /** - * Default constructor. - */ - public Builder() { - } - /** * Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor * IRecordProcessor}. @@ -1136,127 +1181,6 @@ public class Worker implements Runnable { return this; } - /** - * Set the Worker config. - * - * @param config - * Kinesis Client Library configuration - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder config(KinesisClientLibConfiguration config) { - this.config = config; - return this; - } - - /** - * Set the Kinesis client. - * - * @param kinesisClient - * Kinesis Client used for fetching data - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder kinesisClient(AmazonKinesis kinesisClient) { - this.kinesisClient = kinesisClient; - return this; - } - - /** - * Set the DynamoDB client. - * - * @param dynamoDBClient - * DynamoDB client used for checkpoints and tracking leases - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) { - this.dynamoDBClient = dynamoDBClient; - return this; - } - - /** - * Set the Cloudwatch client. - * - * @param cloudWatchClient - * CloudWatch Client for publishing metrics - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) { - this.cloudWatchClient = cloudWatchClient; - return this; - } - - /** - * Set the metrics factory. - * - * @param metricsFactory - * Metrics factory used to emit metrics - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder metricsFactory(IMetricsFactory metricsFactory) { - this.metricsFactory = metricsFactory; - return this; - } - - /** - * Set the lease manager. - * - * @param leaseManager - * Lease manager used to manage shard leases - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder leaseManager(ILeaseManager leaseManager) { - this.leaseManager = leaseManager; - return this; - } - - /** - * Set the executor service for processing records. - * - * @param execService - * ExecutorService to use for processing records (support for multi-threaded consumption) - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder execService(ExecutorService execService) { - this.execService = execService; - return this; - } - - /** - * Provides logic how to prioritize shard processing. - * - * @param shardPrioritization - * shardPrioritization is responsible to order shards before processing - * - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder shardPrioritization(ShardPrioritization shardPrioritization) { - this.shardPrioritization = shardPrioritization; - return this; - } - - /** - * Set KinesisProxy for the worker. - * - * @param kinesisProxy - * Sets an implementation of IKinesisProxy. - * - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder kinesisProxy(IKinesisProxy kinesisProxy) { - this.kinesisProxy = kinesisProxy; - return this; - } - - /** - * Set WorkerStateChangeListener for the worker - * @param workerStateChangeListener - * Sets the WorkerStateChangeListener - * @return A reference to this updated object so that method calls can be chained together. - */ - public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) { - this.workerStateChangeListener = workerStateChangeListener; - return this; - } - /** * Build the Worker instance. * @@ -1367,5 +1291,6 @@ public class Worker implements Runnable { config.getMaxGetRecordsThreadPool(), workerStateChangeListener); } + } }