Cleaning up configuration

* Deprecating createGracefulShutdownCoordinator from CoordinatorFactory
* Deprecating createWorkerStateChangeListener from CoordinatorFactory
* Introduing gracefulShutdownCoordinator and workerStateChangeListener configurations to CoordinatorConfig
* Switching to use CoordinatorFactory only if the new configurations in code are set to null
This commit is contained in:
Sahil Palvia 2018-08-27 11:26:04 -07:00
parent 99667e8f50
commit 7fa9e10991
4 changed files with 76 additions and 3 deletions

View file

@ -74,6 +74,20 @@ public class CoordinatorConfig {
*/ */
private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); private ShardPrioritization shardPrioritization = new NoOpShardPrioritization();
/**
* WorkerStateChangeListener to be used by the Scheduler.
*
* <p>Default value: {@link NoOpWorkerStateChangeListener}</p>
*/
private WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener();
/**
* GracefulShutdownCoordinator to be used by the Scheduler.
*
* <p>Default value: {@link GracefulShutdownCoordinator}</p>
*/
private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory();
} }

View file

@ -25,11 +25,47 @@ import software.amazon.kinesis.processor.Checkpointer;
* *
*/ */
public interface CoordinatorFactory { public interface CoordinatorFactory {
/**
* Creates the executor service to be used by the Scheduler.
*
* @return ExecutorService
*/
ExecutorService createExecutorService(); ExecutorService createExecutorService();
/**
* Creates GracefulShutdownCoordinator to be used by the Scheduler.
*
* <p>
* Note: This method has been deprecated, and will be removed in a future release. Use the configuration in
* {@link CoordinatorConfig#gracefulShutdownCoordinator}. Set the
* {@link CoordinatorConfig#gracefulShutdownCoordinator} to null in order to use this method.
* </p>
*
* @return GracefulShutdownCoordinator
*/
@Deprecated
GracefulShutdownCoordinator createGracefulShutdownCoordinator(); GracefulShutdownCoordinator createGracefulShutdownCoordinator();
/**
* Creates a WorkerStateChangeListener to be used by the Scheduler.
*
* <p>
* Note: This method has been deprecated, and will be removed in a future release. Use the configuration in
* {@link CoordinatorConfig#workerStateChangeListener}. Set the
* {@link CoordinatorConfig#workerStateChangeListener} to null in order to use this method.
* </p>
*
* @return
*/
@Deprecated
WorkerStateChangeListener createWorkerStateChangeListener(); WorkerStateChangeListener createWorkerStateChangeListener();
/**
* Creates a RecordProcessorChedckpointer to be used by the Scheduler.
*
* @param shardInfo ShardInfo to be used in order to create the ShardRecordProcessorCheckpointer
* @param checkpoint Checkpointer to be used in order to create Shardthe RecordProcessorCheckpointer
* @return ShardRecordProcessorCheckpointer
*/
ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(ShardInfo shardInfo, Checkpointer checkpoint); ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(ShardInfo shardInfo, Checkpointer checkpoint);
} }

View file

@ -170,9 +170,18 @@ public class Scheduler implements Runnable {
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.skipShardSyncAtWorkerInitializationIfLeasesExist =
this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(); this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
this.gracefulShutdownCoordinator = if (coordinatorConfig.gracefulShutdownCoordinator() != null) {
this.coordinatorConfig.coordinatorFactory().createGracefulShutdownCoordinator(); this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator();
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory().createWorkerStateChangeListener(); } else {
this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory()
.createGracefulShutdownCoordinator();
}
if (coordinatorConfig.workerStateChangeListener() != null) {
this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener();
} else {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener();
}
this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();

View file

@ -36,17 +36,28 @@ import software.amazon.kinesis.processor.Checkpointer;
@Data @Data
@KinesisClientInternalApi @KinesisClientInternalApi
public class SchedulerCoordinatorFactory implements CoordinatorFactory { public class SchedulerCoordinatorFactory implements CoordinatorFactory {
/**
* {@inheritDoc}
*/
@Override @Override
public ExecutorService createExecutorService() { public ExecutorService createExecutorService() {
return new SchedulerThreadPoolExecutor( return new SchedulerThreadPoolExecutor(
new ThreadFactoryBuilder().setNameFormat("ShardRecordProcessor-%04d").build()); new ThreadFactoryBuilder().setNameFormat("ShardRecordProcessor-%04d").build());
} }
/**
* {@inheritDoc}
*/
@Deprecated
@Override @Override
public GracefulShutdownCoordinator createGracefulShutdownCoordinator() { public GracefulShutdownCoordinator createGracefulShutdownCoordinator() {
return new GracefulShutdownCoordinator(); return new GracefulShutdownCoordinator();
} }
/**
* {@inheritDoc}
*/
@Deprecated
@Override @Override
public WorkerStateChangeListener createWorkerStateChangeListener() { public WorkerStateChangeListener createWorkerStateChangeListener() {
return new NoOpWorkerStateChangeListener(); return new NoOpWorkerStateChangeListener();
@ -60,6 +71,9 @@ public class SchedulerCoordinatorFactory implements CoordinatorFactory {
} }
} }
/**
* {@inheritDoc}
*/
@Override @Override
public ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(@NonNull final ShardInfo shardInfo, public ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(@NonNull final ShardInfo shardInfo,
@NonNull final Checkpointer checkpoint) { @NonNull final Checkpointer checkpoint) {