[BugFix] Extending ShardConsumer class constructor to have ConsumerTaskFactory as a param

This commit is contained in:
gguptp 2025-04-08 01:03:25 +05:30
parent 6990fc513f
commit 92654aaebc
4 changed files with 18 additions and 34 deletions

View file

@ -1167,7 +1167,8 @@ public class Scheduler implements Runnable {
lifecycleConfig.logWarningForTaskAfterMillis(),
argument,
lifecycleConfig.taskExecutionListener(),
lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
lifecycleConfig.readTimeoutsToIgnoreBeforeWarning(),
leaseManagementConfig.consumerTaskFactory());
}
/**

View file

@ -88,6 +88,9 @@ public class ShardConsumer {
private final ConsumerTaskFactory taskFactory;
//
// TODO: Make bufferSize configurable
//
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
@ -95,7 +98,8 @@ public class ShardConsumer {
Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) {
int readTimeoutsToIgnoreBeforeWarning,
ConsumerTaskFactory consumerTaskFactory) {
this(
recordsPublisher,
executorService,
@ -106,33 +110,7 @@ public class ShardConsumer {
8,
taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning,
new KinesisConsumerTaskFactory());
}
//
// TODO: Make bufferSize configurable
//
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument,
ConsumerState initialState,
int bufferSize,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) {
this(
recordsPublisher,
executorService,
shardInfo,
logWarningForTaskAfterMillis,
shardConsumerArgument,
initialState,
bufferSize,
taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning,
new KinesisConsumerTaskFactory());
consumerTaskFactory);
}
public ShardConsumer(

View file

@ -167,7 +167,8 @@ public class ConsumerStatesTest {
logWarningForTaskAfterMillis,
argument,
taskExecutionListener,
0));
0,
new KinesisConsumerTaskFactory()));
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
}

View file

@ -781,7 +781,8 @@ public class ShardConsumerTest {
initialState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(null);
@ -836,7 +837,8 @@ public class ShardConsumerTest {
initialState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
CyclicBarrier taskArriveBarrier = new CyclicBarrier(2);
CyclicBarrier taskDepartBarrier = new CyclicBarrier(2);
@ -943,7 +945,8 @@ public class ShardConsumerTest {
mockState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS);
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
@ -1155,6 +1158,7 @@ public class ShardConsumerTest {
state,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
}
}