diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 2382b4e1..e3feaa97 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -88,7 +88,6 @@ import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.ConsumerTaskFactory; -import software.amazon.kinesis.lifecycle.KinesisConsumerTaskFactory; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; import software.amazon.kinesis.lifecycle.ShardConsumerArgument; @@ -267,33 +266,6 @@ public class Scheduler implements Runnable { @NonNull final ProcessorConfig processorConfig, @NonNull final RetrievalConfig retrievalConfig, @NonNull final DiagnosticEventFactory diagnosticEventFactory) { - this( - checkpointConfig, - coordinatorConfig, - leaseManagementConfig, - lifecycleConfig, - metricsConfig, - processorConfig, - retrievalConfig, - diagnosticEventFactory, - new KinesisConsumerTaskFactory()); - } - - /** - * Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility - * is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory. - */ - @VisibleForTesting - protected Scheduler( - @NonNull final CheckpointConfig checkpointConfig, - @NonNull final CoordinatorConfig coordinatorConfig, - @NonNull final LeaseManagementConfig leaseManagementConfig, - @NonNull final LifecycleConfig lifecycleConfig, - @NonNull final MetricsConfig metricsConfig, - @NonNull final ProcessorConfig processorConfig, - @NonNull final RetrievalConfig retrievalConfig, - @NonNull final DiagnosticEventFactory diagnosticEventFactory, - @NonNull final ConsumerTaskFactory taskFactory) { this.checkpointConfig = checkpointConfig; this.coordinatorConfig = coordinatorConfig; this.leaseManagementConfig = leaseManagementConfig; @@ -401,7 +373,7 @@ public class Scheduler implements Runnable { this.schemaRegistryDecoder = this.retrievalConfig.glueSchemaRegistryDeserializer() == null ? null : new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer()); - this.taskFactory = taskFactory; + this.taskFactory = leaseManagementConfig().consumerTaskFactory(); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index ef750f46..1839b494 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -45,6 +45,8 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; +import software.amazon.kinesis.lifecycle.ConsumerTaskFactory; +import software.amazon.kinesis.lifecycle.KinesisConsumerTaskFactory; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.worker.metric.WorkerMetric; @@ -215,6 +217,8 @@ public class LeaseManagementConfig { private BillingMode billingMode = BillingMode.PAY_PER_REQUEST; + private ConsumerTaskFactory consumerTaskFactory = new KinesisConsumerTaskFactory(); + private WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig = new WorkerUtilizationAwareAssignmentConfig();