From 6a8e248a36b0d40b57458020bb253b29d7423718 Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Thu, 2 May 2024 11:39:49 -0700 Subject: [PATCH] Address review comments --- amazon-kinesis-client/pom.xml | 7 ------- .../kinesis/lifecycle/ShardConsumerTest.java | 19 +++++++++---------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 09c0ace3..551fe0a4 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -122,13 +122,6 @@ - - org.awaitility - awaitility - 3.0.0 - test - - junit junit diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index c70608fe..8db3d517 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -59,7 +59,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -858,15 +857,15 @@ public class ShardConsumerTest { @Test public void testEmptyShardProcessingRaceCondition() throws Exception { - RecordsPublisher mockPublisher = mock(RecordsPublisher.class); - ExecutorService mockExecutor = mock(ExecutorService.class); - ConsumerState mockState = mock(ConsumerState.class); - ShardConsumer consumer = new ShardConsumer(mockPublisher, mockExecutor, shardInfo, Optional.of(1L), + final RecordsPublisher mockPublisher = mock(RecordsPublisher.class); + final ExecutorService mockExecutor = mock(ExecutorService.class); + final ConsumerState mockState = mock(ConsumerState.class); + final ShardConsumer consumer = new ShardConsumer(mockPublisher, mockExecutor, shardInfo, Optional.of(1L), shardConsumerArgument, mockState, Function.identity(), 1, taskExecutionListener, 0); when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS); when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS); - ConsumerTask mockTask = mock(ConsumerTask.class); + final ConsumerTask mockTask = mock(ConsumerTask.class); when(mockState.createTask(any(), any(), any())).thenReturn(mockTask); // Simulate successful BlockedOnParent task execution // and successful Initialize task execution @@ -875,7 +874,7 @@ public class ShardConsumerTest { log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to initiate async" + " processing of blocked on parent task"); consumer.executeLifecycle(); - ArgumentCaptor taskToExecute = ArgumentCaptor.forClass(Runnable.class); + final ArgumentCaptor taskToExecute = ArgumentCaptor.forClass(Runnable.class); verify(mockExecutor, timeout(100)).execute(taskToExecute.capture()); taskToExecute.getValue().run(); log.info("RecordProcessor Thread: Simulated successful execution of Blocked on parent task"); @@ -907,11 +906,11 @@ public class ShardConsumerTest { // In order to control the order in which execution occurs, lets first invoke // handleInput, although this will never happen, since there isn't a way // to control the precise timing of the thread execution, this is the best way - CountDownLatch processTaskLatch = new CountDownLatch(1); + final CountDownLatch processTaskLatch = new CountDownLatch(1); new Thread(() -> { reset(mockState); when(mockState.taskType()).thenReturn(TaskType.PROCESS); - ConsumerTask mockProcessTask = mock(ConsumerTask.class); + final ConsumerTask mockProcessTask = mock(ConsumerTask.class); when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask); when(mockProcessTask.call()).then(input -> { // first we want to wait for subscribe to be called, @@ -927,7 +926,7 @@ public class ShardConsumerTest { log.info("RecordProcessor Thread: Simulating execution of ProcessTask and returning shard-end result"); return new TaskResult(true); }); - Subscription mockSubscription = mock(Subscription.class); + final Subscription mockSubscription = mock(Subscription.class); consumer.handleInput(ProcessRecordsInput.builder().isAtShardEnd(true).build(), mockSubscription); }).start();