Merge pull request #385 from sahilpalvia/coordinator-config-fix

Cleaning up configuration

Deprecating createGracefulShutdownCoordinator from CoordinatorFactory
Deprecating createWorkerStateChangeListener from CoordinatorFactory
Introduing gracefulShutdownCoordinator and workerStateChangeListener configurations to CoordinatorConfig
Switching to use CoordinatorFactory only if the new configurations in code are set to null
This commit is contained in:
Justin Pfifer 2018-08-27 11:51:06 -07:00 committed by GitHub
commit 7734561e18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 3 deletions

View file

@ -74,6 +74,20 @@ public class CoordinatorConfig {
*/
private ShardPrioritization shardPrioritization = new NoOpShardPrioritization();
/**
* WorkerStateChangeListener to be used by the Scheduler.
*
* <p>Default value: {@link NoOpWorkerStateChangeListener}</p>
*/
private WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener();
/**
* GracefulShutdownCoordinator to be used by the Scheduler.
*
* <p>Default value: {@link GracefulShutdownCoordinator}</p>
*/
private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();
private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory();
}

View file

@ -25,11 +25,47 @@ import software.amazon.kinesis.processor.Checkpointer;
*
*/
public interface CoordinatorFactory {
/**
* Creates the executor service to be used by the Scheduler.
*
* @return ExecutorService
*/
ExecutorService createExecutorService();
/**
* Creates GracefulShutdownCoordinator to be used by the Scheduler.
*
* <p>
* Note: This method has been deprecated, and will be removed in a future release. Use the configuration in
* {@link CoordinatorConfig#gracefulShutdownCoordinator}. Set the
* {@link CoordinatorConfig#gracefulShutdownCoordinator} to null in order to use this method.
* </p>
*
* @return GracefulShutdownCoordinator
*/
@Deprecated
GracefulShutdownCoordinator createGracefulShutdownCoordinator();
/**
* Creates a WorkerStateChangeListener to be used by the Scheduler.
*
* <p>
* Note: This method has been deprecated, and will be removed in a future release. Use the configuration in
* {@link CoordinatorConfig#workerStateChangeListener}. Set the
* {@link CoordinatorConfig#workerStateChangeListener} to null in order to use this method.
* </p>
*
* @return
*/
@Deprecated
WorkerStateChangeListener createWorkerStateChangeListener();
/**
* Creates a RecordProcessorChedckpointer to be used by the Scheduler.
*
* @param shardInfo ShardInfo to be used in order to create the ShardRecordProcessorCheckpointer
* @param checkpoint Checkpointer to be used in order to create Shardthe RecordProcessorCheckpointer
* @return ShardRecordProcessorCheckpointer
*/
ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(ShardInfo shardInfo, Checkpointer checkpoint);
}

View file

@ -170,9 +170,18 @@ public class Scheduler implements Runnable {
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
this.gracefulShutdownCoordinator =
this.coordinatorConfig.coordinatorFactory().createGracefulShutdownCoordinator();
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory().createWorkerStateChangeListener();
if (coordinatorConfig.gracefulShutdownCoordinator() != null) {
this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator();
} else {
this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory()
.createGracefulShutdownCoordinator();
}
if (coordinatorConfig.workerStateChangeListener() != null) {
this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener();
} else {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener();
}
this.initialPosition = retrievalConfig.initialPositionInStreamExtended();
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();

View file

@ -36,17 +36,28 @@ import software.amazon.kinesis.processor.Checkpointer;
@Data
@KinesisClientInternalApi
public class SchedulerCoordinatorFactory implements CoordinatorFactory {
/**
* {@inheritDoc}
*/
@Override
public ExecutorService createExecutorService() {
return new SchedulerThreadPoolExecutor(
new ThreadFactoryBuilder().setNameFormat("ShardRecordProcessor-%04d").build());
}
/**
* {@inheritDoc}
*/
@Deprecated
@Override
public GracefulShutdownCoordinator createGracefulShutdownCoordinator() {
return new GracefulShutdownCoordinator();
}
/**
* {@inheritDoc}
*/
@Deprecated
@Override
public WorkerStateChangeListener createWorkerStateChangeListener() {
return new NoOpWorkerStateChangeListener();
@ -60,6 +71,9 @@ public class SchedulerCoordinatorFactory implements CoordinatorFactory {
}
}
/**
* {@inheritDoc}
*/
@Override
public ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(@NonNull final ShardInfo shardInfo,
@NonNull final Checkpointer checkpoint) {