diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index e1ccc0e8..6098b2fa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -74,6 +74,20 @@ public class CoordinatorConfig { */ private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); + /** + * WorkerStateChangeListener to be used by the Scheduler. + * + *
Default value: {@link NoOpWorkerStateChangeListener}
+ */ + private WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener(); + + /** + * GracefulShutdownCoordinator to be used by the Scheduler. + * + *Default value: {@link GracefulShutdownCoordinator}
+ */ + private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorFactory.java index 9e127a4f..09d51705 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorFactory.java @@ -25,11 +25,47 @@ import software.amazon.kinesis.processor.Checkpointer; * */ public interface CoordinatorFactory { + /** + * Creates the executor service to be used by the Scheduler. + * + * @return ExecutorService + */ ExecutorService createExecutorService(); + /** + * Creates GracefulShutdownCoordinator to be used by the Scheduler. + * + *+ * Note: This method has been deprecated, and will be removed in a future release. Use the configuration in + * {@link CoordinatorConfig#gracefulShutdownCoordinator}. Set the + * {@link CoordinatorConfig#gracefulShutdownCoordinator} to null in order to use this method. + *
+ * + * @return GracefulShutdownCoordinator + */ + @Deprecated GracefulShutdownCoordinator createGracefulShutdownCoordinator(); + /** + * Creates a WorkerStateChangeListener to be used by the Scheduler. + * + *+ * Note: This method has been deprecated, and will be removed in a future release. Use the configuration in + * {@link CoordinatorConfig#workerStateChangeListener}. Set the + * {@link CoordinatorConfig#workerStateChangeListener} to null in order to use this method. + *
+ * + * @return + */ + @Deprecated WorkerStateChangeListener createWorkerStateChangeListener(); + /** + * Creates a RecordProcessorChedckpointer to be used by the Scheduler. + * + * @param shardInfo ShardInfo to be used in order to create the ShardRecordProcessorCheckpointer + * @param checkpoint Checkpointer to be used in order to create Shardthe RecordProcessorCheckpointer + * @return ShardRecordProcessorCheckpointer + */ ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(ShardInfo shardInfo, Checkpointer checkpoint); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index c16a80e5..19c9076c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -170,9 +170,18 @@ public class Scheduler implements Runnable { this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(); - this.gracefulShutdownCoordinator = - this.coordinatorConfig.coordinatorFactory().createGracefulShutdownCoordinator(); - this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory().createWorkerStateChangeListener(); + if (coordinatorConfig.gracefulShutdownCoordinator() != null) { + this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator(); + } else { + this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory() + .createGracefulShutdownCoordinator(); + } + if (coordinatorConfig.workerStateChangeListener() != null) { + this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener(); + } else { + this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory() + .createWorkerStateChangeListener(); + } this.initialPosition = retrievalConfig.initialPositionInStreamExtended(); this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis(); this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java index f3a2b14b..b3aa5e4f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/SchedulerCoordinatorFactory.java @@ -36,17 +36,28 @@ import software.amazon.kinesis.processor.Checkpointer; @Data @KinesisClientInternalApi public class SchedulerCoordinatorFactory implements CoordinatorFactory { + /** + * {@inheritDoc} + */ @Override public ExecutorService createExecutorService() { return new SchedulerThreadPoolExecutor( new ThreadFactoryBuilder().setNameFormat("ShardRecordProcessor-%04d").build()); } + /** + * {@inheritDoc} + */ + @Deprecated @Override public GracefulShutdownCoordinator createGracefulShutdownCoordinator() { return new GracefulShutdownCoordinator(); } + /** + * {@inheritDoc} + */ + @Deprecated @Override public WorkerStateChangeListener createWorkerStateChangeListener() { return new NoOpWorkerStateChangeListener(); @@ -60,6 +71,9 @@ public class SchedulerCoordinatorFactory implements CoordinatorFactory { } } + /** + * {@inheritDoc} + */ @Override public ShardRecordProcessorCheckpointer createRecordProcessorCheckpointer(@NonNull final ShardInfo shardInfo, @NonNull final Checkpointer checkpoint) {