Refactoring the Worker to make it manageable (#300)

* Reformatting the constructors to be together in the Worker class.

* Using lombok annotations to replace setters in the Worker.Builder.

* Deprecating all the public constructors for the Worker class. Recommending to use the Worker.Builder to create the Worker object.

* Including annotation usage for leaseManager in  the Worker.Builder

* Adding capability to support Immutable clients by the Worker and Worker.Builder

* Adding annotation support for WorkerStateChangeListener.
This commit is contained in:
Sahil Palvia 2018-03-05 10:25:41 -08:00 committed by Justin Pfifer
parent 523cc0e2cc
commit a53473d536

View file

@ -32,9 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -50,10 +47,12 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
@ -61,6 +60,9 @@ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Setter;
import lombok.experimental.Accessors;
/**
* Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees
* different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from
@ -124,11 +126,15 @@ public class Worker implements Runnable {
/**
* Constructor.
*
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config) {
@ -138,6 +144,9 @@ public class Worker implements Runnable {
/**
* Constructor.
*
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -145,6 +154,7 @@ public class Worker implements Runnable {
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, ExecutorService execService) {
@ -158,6 +168,9 @@ public class Worker implements Runnable {
}
/**
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -165,6 +178,7 @@ public class Worker implements Runnable {
* @param metricsFactory
* Metrics factory used to emit metrics
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, IMetricsFactory metricsFactory) {
@ -172,6 +186,9 @@ public class Worker implements Runnable {
}
/**
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -181,6 +198,7 @@ public class Worker implements Runnable {
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, IMetricsFactory metricsFactory, ExecutorService execService) {
@ -192,6 +210,9 @@ public class Worker implements Runnable {
}
/**
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -203,6 +224,7 @@ public class Worker implements Runnable {
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
@ -211,6 +233,9 @@ public class Worker implements Runnable {
}
/**
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -224,6 +249,7 @@ public class Worker implements Runnable {
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
@ -232,7 +258,70 @@ public class Worker implements Runnable {
execService);
}
// Backwards compatible constructors
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient);
}
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient, execService);
}
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
@ -246,6 +335,33 @@ public class Worker implements Runnable {
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
metricsFactory, execService);
}
/**
* @deprecated The access to this constructor will be changed in a future release. The recommended way to create
* a Worker is to use {@link Builder}
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
@Deprecated
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
@ -943,80 +1059,6 @@ public class Worker implements Runnable {
}
}
// Backwards compatible constructors
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
*/
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient);
}
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
(AmazonCloudWatch) cloudWatchClient, execService);
}
/**
* This constructor is for binary compatibility with code compiled against version of the KCL that only have
* constructors taking "Client" objects.
*
* @param recordProcessorFactory
* Used to get record processor instances for processing data from shards
* @param config
* Kinesis Client Library configuration
* @param kinesisClient
* Kinesis Client used for fetching data
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @param metricsFactory
* Metrics factory used to emit metrics
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
*/
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
this(recordProcessorFactory, config, (AmazonKinesis) kinesisClient, (AmazonDynamoDB) dynamoDBClient,
metricsFactory, execService);
}
@VisibleForTesting
StreamConfig getStreamConfig() {
return streamConfig;
@ -1091,24 +1133,27 @@ public class Worker implements Runnable {
public static class Builder {
private IRecordProcessorFactory recordProcessorFactory;
private RecordsFetcherFactory recordsFetcherFactory;
@Setter @Accessors(fluent = true)
private KinesisClientLibConfiguration config;
@Setter @Accessors(fluent = true)
private AmazonKinesis kinesisClient;
@Setter @Accessors(fluent = true)
private AmazonDynamoDB dynamoDBClient;
@Setter @Accessors(fluent = true)
private AmazonCloudWatch cloudWatchClient;
@Setter @Accessors(fluent = true)
private IMetricsFactory metricsFactory;
@Setter @Accessors(fluent = true)
private ILeaseManager<KinesisClientLease> leaseManager;
@Setter @Accessors(fluent = true)
private ExecutorService execService;
@Setter @Accessors(fluent = true)
private ShardPrioritization shardPrioritization;
@Setter @Accessors(fluent = true)
private IKinesisProxy kinesisProxy;
@Setter @Accessors(fluent = true)
private WorkerStateChangeListener workerStateChangeListener;
/**
* Default constructor.
*/
public Builder() {
}
/**
* Provide a V1 {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
* IRecordProcessor}.
@ -1136,127 +1181,6 @@ public class Worker implements Runnable {
return this;
}
/**
* Set the Worker config.
*
* @param config
* Kinesis Client Library configuration
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder config(KinesisClientLibConfiguration config) {
this.config = config;
return this;
}
/**
* Set the Kinesis client.
*
* @param kinesisClient
* Kinesis Client used for fetching data
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisClient(AmazonKinesis kinesisClient) {
this.kinesisClient = kinesisClient;
return this;
}
/**
* Set the DynamoDB client.
*
* @param dynamoDBClient
* DynamoDB client used for checkpoints and tracking leases
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder dynamoDBClient(AmazonDynamoDB dynamoDBClient) {
this.dynamoDBClient = dynamoDBClient;
return this;
}
/**
* Set the Cloudwatch client.
*
* @param cloudWatchClient
* CloudWatch Client for publishing metrics
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder cloudWatchClient(AmazonCloudWatch cloudWatchClient) {
this.cloudWatchClient = cloudWatchClient;
return this;
}
/**
* Set the metrics factory.
*
* @param metricsFactory
* Metrics factory used to emit metrics
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder metricsFactory(IMetricsFactory metricsFactory) {
this.metricsFactory = metricsFactory;
return this;
}
/**
* Set the lease manager.
*
* @param leaseManager
* Lease manager used to manage shard leases
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder leaseManager(ILeaseManager<KinesisClientLease> leaseManager) {
this.leaseManager = leaseManager;
return this;
}
/**
* Set the executor service for processing records.
*
* @param execService
* ExecutorService to use for processing records (support for multi-threaded consumption)
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder execService(ExecutorService execService) {
this.execService = execService;
return this;
}
/**
* Provides logic how to prioritize shard processing.
*
* @param shardPrioritization
* shardPrioritization is responsible to order shards before processing
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
this.shardPrioritization = shardPrioritization;
return this;
}
/**
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* Sets an implementation of IKinesisProxy.
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
this.kinesisProxy = kinesisProxy;
return this;
}
/**
* Set WorkerStateChangeListener for the worker
* @param workerStateChangeListener
* Sets the WorkerStateChangeListener
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) {
this.workerStateChangeListener = workerStateChangeListener;
return this;
}
/**
* Build the Worker instance.
*
@ -1367,5 +1291,6 @@ public class Worker implements Runnable {
config.getMaxGetRecordsThreadPool(),
workerStateChangeListener);
}
}
}