diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e3feaa97..73e1ed32 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -1167,7 +1167,8 @@ public class Scheduler implements Runnable { lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(), - lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); + lifecycleConfig.readTimeoutsToIgnoreBeforeWarning(), + leaseManagementConfig.consumerTaskFactory()); } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index a23732f7..6ec33643 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -88,6 +88,9 @@ public class ShardConsumer { private final ConsumerTaskFactory taskFactory; + // + // TODO: Make bufferSize configurable + // public ShardConsumer( RecordsPublisher recordsPublisher, ExecutorService executorService, @@ -95,7 +98,8 @@ public class ShardConsumer { Optional logWarningForTaskAfterMillis, ShardConsumerArgument shardConsumerArgument, TaskExecutionListener taskExecutionListener, - int readTimeoutsToIgnoreBeforeWarning) { + int readTimeoutsToIgnoreBeforeWarning, + ConsumerTaskFactory consumerTaskFactory) { this( recordsPublisher, executorService, @@ -106,33 +110,7 @@ public class ShardConsumer { 8, taskExecutionListener, readTimeoutsToIgnoreBeforeWarning, - new KinesisConsumerTaskFactory()); - } - - // - // TODO: Make bufferSize configurable - // - public ShardConsumer( - RecordsPublisher recordsPublisher, - ExecutorService executorService, - ShardInfo shardInfo, - Optional logWarningForTaskAfterMillis, - ShardConsumerArgument shardConsumerArgument, - ConsumerState initialState, - int bufferSize, - TaskExecutionListener taskExecutionListener, - int readTimeoutsToIgnoreBeforeWarning) { - this( - recordsPublisher, - executorService, - shardInfo, - logWarningForTaskAfterMillis, - shardConsumerArgument, - initialState, - bufferSize, - taskExecutionListener, - readTimeoutsToIgnoreBeforeWarning, - new KinesisConsumerTaskFactory()); + consumerTaskFactory); } public ShardConsumer( diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index cc41c479..c0853eab 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -167,7 +167,8 @@ public class ConsumerStatesTest { logWarningForTaskAfterMillis, argument, taskExecutionListener, - 0)); + 0, + new KinesisConsumerTaskFactory())); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 6390831f..fefaf539 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -781,7 +781,8 @@ public class ShardConsumerTest { initialState, 1, taskExecutionListener, - 0); + 0, + new KinesisConsumerTaskFactory()); mockSuccessfulInitialize(null); mockSuccessfulProcessing(null); @@ -836,7 +837,8 @@ public class ShardConsumerTest { initialState, 1, taskExecutionListener, - 0); + 0, + new KinesisConsumerTaskFactory()); CyclicBarrier taskArriveBarrier = new CyclicBarrier(2); CyclicBarrier taskDepartBarrier = new CyclicBarrier(2); @@ -943,7 +945,8 @@ public class ShardConsumerTest { mockState, 1, taskExecutionListener, - 0); + 0, + new KinesisConsumerTaskFactory()); when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS); when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS); @@ -1155,6 +1158,7 @@ public class ShardConsumerTest { state, 1, taskExecutionListener, - 0); + 0, + new KinesisConsumerTaskFactory()); } }