Extending ShardConsumer class constructor to have ConsumerTaskFactory as a param (#1463)

This commit is contained in:
Abhi Gupta 2025-04-15 01:27:58 +05:30 committed by GitHub
parent 6990fc513f
commit 897d993782
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 18 additions and 34 deletions

View file

@ -1167,7 +1167,8 @@ public class Scheduler implements Runnable {
lifecycleConfig.logWarningForTaskAfterMillis(), lifecycleConfig.logWarningForTaskAfterMillis(),
argument, argument,
lifecycleConfig.taskExecutionListener(), lifecycleConfig.taskExecutionListener(),
lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); lifecycleConfig.readTimeoutsToIgnoreBeforeWarning(),
leaseManagementConfig.consumerTaskFactory());
} }
/** /**

View file

@ -88,6 +88,9 @@ public class ShardConsumer {
private final ConsumerTaskFactory taskFactory; private final ConsumerTaskFactory taskFactory;
//
// TODO: Make bufferSize configurable
//
public ShardConsumer( public ShardConsumer(
RecordsPublisher recordsPublisher, RecordsPublisher recordsPublisher,
ExecutorService executorService, ExecutorService executorService,
@ -95,7 +98,8 @@ public class ShardConsumer {
Optional<Long> logWarningForTaskAfterMillis, Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument, ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener, TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) { int readTimeoutsToIgnoreBeforeWarning,
ConsumerTaskFactory consumerTaskFactory) {
this( this(
recordsPublisher, recordsPublisher,
executorService, executorService,
@ -106,33 +110,7 @@ public class ShardConsumer {
8, 8,
taskExecutionListener, taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning, readTimeoutsToIgnoreBeforeWarning,
new KinesisConsumerTaskFactory()); consumerTaskFactory);
}
//
// TODO: Make bufferSize configurable
//
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument,
ConsumerState initialState,
int bufferSize,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) {
this(
recordsPublisher,
executorService,
shardInfo,
logWarningForTaskAfterMillis,
shardConsumerArgument,
initialState,
bufferSize,
taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning,
new KinesisConsumerTaskFactory());
} }
public ShardConsumer( public ShardConsumer(

View file

@ -167,7 +167,8 @@ public class ConsumerStatesTest {
logWarningForTaskAfterMillis, logWarningForTaskAfterMillis,
argument, argument,
taskExecutionListener, taskExecutionListener,
0)); 0,
new KinesisConsumerTaskFactory()));
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
} }

View file

@ -781,7 +781,8 @@ public class ShardConsumerTest {
initialState, initialState,
1, 1,
taskExecutionListener, taskExecutionListener,
0); 0,
new KinesisConsumerTaskFactory());
mockSuccessfulInitialize(null); mockSuccessfulInitialize(null);
mockSuccessfulProcessing(null); mockSuccessfulProcessing(null);
@ -836,7 +837,8 @@ public class ShardConsumerTest {
initialState, initialState,
1, 1,
taskExecutionListener, taskExecutionListener,
0); 0,
new KinesisConsumerTaskFactory());
CyclicBarrier taskArriveBarrier = new CyclicBarrier(2); CyclicBarrier taskArriveBarrier = new CyclicBarrier(2);
CyclicBarrier taskDepartBarrier = new CyclicBarrier(2); CyclicBarrier taskDepartBarrier = new CyclicBarrier(2);
@ -943,7 +945,8 @@ public class ShardConsumerTest {
mockState, mockState,
1, 1,
taskExecutionListener, taskExecutionListener,
0); 0,
new KinesisConsumerTaskFactory());
when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS); when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS);
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS); when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
@ -1155,6 +1158,7 @@ public class ShardConsumerTest {
state, state,
1, 1,
taskExecutionListener, taskExecutionListener,
0); 0,
new KinesisConsumerTaskFactory());
} }
} }