Refactor startSubscriptionsAndWait

Remove startSubscriptionsAndWait ShardConsumerSubscriber parameter to use class variable instead.
Create constant for default notifier timeout value.
This commit is contained in:
furq-aws 2023-03-23 17:26:31 -07:00
parent ee1f1c50c7
commit b7dd458962

View file

@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest {
private static final String TERMINAL_MARKER = "Terminal"; private static final String TERMINAL_MARKER = "Terminal";
private static final long DEFAULT_NOTIFIER_TIMEOUT = 5000;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
@Mock @Mock
@ -125,7 +127,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(1); setupNotifierAnswer(1);
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
} }
@ -136,7 +138,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(recordsPublisher.responses.size()); setupNotifierAnswer(recordsPublisher.responses.size());
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
} }
@ -165,7 +167,7 @@ public class ShardConsumerSubscriberTest {
} }
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
@ -183,7 +185,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10); setupNotifierAnswer(10);
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
for (int attempts = 0; attempts < 10; attempts++) { for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) { if (subscriber.retrievalFailure() != null) {
@ -208,7 +210,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10); setupNotifierAnswer(10);
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
for (int attempts = 0; attempts < 10; attempts++) { for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) { if (subscriber.retrievalFailure() != null) {
@ -221,7 +223,7 @@ public class ShardConsumerSubscriberTest {
synchronized (processedNotifier) { synchronized (processedNotifier) {
assertThat(subscriber.healthCheck(100000), equalTo(expected)); assertThat(subscriber.healthCheck(100000), equalTo(expected));
processedNotifier.wait(5000); processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
} }
assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord)); assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord));
@ -252,7 +254,7 @@ public class ShardConsumerSubscriberTest {
return null; return null;
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
startSubscriptionsAndWait(subscriber, 5000); startSubscriptionsAndWait();
synchronized (processedNotifier) { synchronized (processedNotifier) {
executorService.execute(() -> { executorService.execute(() -> {
@ -272,7 +274,7 @@ public class ShardConsumerSubscriberTest {
// //
// Wait for our blocking thread to control the thread in the executor. // Wait for our blocking thread to control the thread in the executor.
// //
processedNotifier.wait(5000); processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
} }
Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem); Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem);
@ -283,7 +285,7 @@ public class ShardConsumerSubscriberTest {
assertThat(subscriber.healthCheck(1), nullValue()); assertThat(subscriber.healthCheck(1), nullValue());
barrier.await(500, TimeUnit.MILLISECONDS); barrier.await(500, TimeUnit.MILLISECONDS);
processedNotifier.wait(5000); processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
} }
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
@ -319,7 +321,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions. // First try to start subscriptions.
startSubscriptionsAndWait(subscriber, 100); startSubscriptionsAndWait(100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly. // subscription has not started correctly.
@ -377,7 +379,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions. // First try to start subscriptions.
startSubscriptionsAndWait(subscriber, 100); startSubscriptionsAndWait(100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly. // subscription has not started correctly.
@ -456,7 +458,11 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
} }
private void startSubscriptionsAndWait(ShardConsumerSubscriber subscriber, long timeout) throws InterruptedException { private void startSubscriptionsAndWait() throws InterruptedException {
startSubscriptionsAndWait(DEFAULT_NOTIFIER_TIMEOUT);
}
private void startSubscriptionsAndWait(long timeout) throws InterruptedException {
synchronized (processedNotifier) { synchronized (processedNotifier) {
subscriber.startSubscriptions(); subscriber.startSubscriptions();
processedNotifier.wait(timeout); processedNotifier.wait(timeout);
@ -757,7 +763,7 @@ public class ShardConsumerSubscriberTest {
/** /**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default
* configuration of 0 * configuration of 0
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test
@ -786,7 +792,7 @@ public class ShardConsumerSubscriberTest {
/** /**
* Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to
* ignore * ignore
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test