Address review comments

This commit is contained in:
Aravinda Kidambi Srinivasan 2024-05-02 11:39:49 -07:00
parent 78e3b27601
commit 6a8e248a36
2 changed files with 9 additions and 17 deletions

View file

@ -122,13 +122,6 @@
</dependency>
<!-- Test -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -59,7 +59,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -858,15 +857,15 @@ public class ShardConsumerTest {
@Test
public void testEmptyShardProcessingRaceCondition() throws Exception {
RecordsPublisher mockPublisher = mock(RecordsPublisher.class);
ExecutorService mockExecutor = mock(ExecutorService.class);
ConsumerState mockState = mock(ConsumerState.class);
ShardConsumer consumer = new ShardConsumer(mockPublisher, mockExecutor, shardInfo, Optional.of(1L),
final RecordsPublisher mockPublisher = mock(RecordsPublisher.class);
final ExecutorService mockExecutor = mock(ExecutorService.class);
final ConsumerState mockState = mock(ConsumerState.class);
final ShardConsumer consumer = new ShardConsumer(mockPublisher, mockExecutor, shardInfo, Optional.of(1L),
shardConsumerArgument, mockState, Function.identity(), 1, taskExecutionListener, 0);
when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS);
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
ConsumerTask mockTask = mock(ConsumerTask.class);
final ConsumerTask mockTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockTask);
// Simulate successful BlockedOnParent task execution
// and successful Initialize task execution
@ -875,7 +874,7 @@ public class ShardConsumerTest {
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to initiate async" +
" processing of blocked on parent task");
consumer.executeLifecycle();
ArgumentCaptor<Runnable> taskToExecute = ArgumentCaptor.forClass(Runnable.class);
final ArgumentCaptor<Runnable> taskToExecute = ArgumentCaptor.forClass(Runnable.class);
verify(mockExecutor, timeout(100)).execute(taskToExecute.capture());
taskToExecute.getValue().run();
log.info("RecordProcessor Thread: Simulated successful execution of Blocked on parent task");
@ -907,11 +906,11 @@ public class ShardConsumerTest {
// In order to control the order in which execution occurs, lets first invoke
// handleInput, although this will never happen, since there isn't a way
// to control the precise timing of the thread execution, this is the best way
CountDownLatch processTaskLatch = new CountDownLatch(1);
final CountDownLatch processTaskLatch = new CountDownLatch(1);
new Thread(() -> {
reset(mockState);
when(mockState.taskType()).thenReturn(TaskType.PROCESS);
ConsumerTask mockProcessTask = mock(ConsumerTask.class);
final ConsumerTask mockProcessTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask);
when(mockProcessTask.call()).then(input -> {
// first we want to wait for subscribe to be called,
@ -927,7 +926,7 @@ public class ShardConsumerTest {
log.info("RecordProcessor Thread: Simulating execution of ProcessTask and returning shard-end result");
return new TaskResult(true);
});
Subscription mockSubscription = mock(Subscription.class);
final Subscription mockSubscription = mock(Subscription.class);
consumer.handleInput(ProcessRecordsInput.builder().isAtShardEnd(true).build(), mockSubscription);
}).start();