Fix flaky restartAfterRequestTimerExpires tests (#1084)

Add wait to allow subscriptions to start.
This eliminates flakiness of tests restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitialization() and restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected().
This commit is contained in:
furq-aws 2023-03-24 04:53:44 -07:00 committed by GitHub
parent 10cdf43b9d
commit b8d3390bf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest {
private static final String TERMINAL_MARKER = "Terminal";
private static final long DEFAULT_NOTIFIER_TIMEOUT = 5000L;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
@Mock
@ -125,10 +127,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(1);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
@ -139,10 +138,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(recordsPublisher.responses.size());
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
@ -171,10 +167,7 @@ public class ShardConsumerSubscriberTest {
}
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
@ -192,10 +185,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
@ -220,10 +210,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
@ -236,7 +223,7 @@ public class ShardConsumerSubscriberTest {
synchronized (processedNotifier) {
assertThat(subscriber.healthCheck(100000), equalTo(expected));
processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}
assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord));
@ -267,10 +254,7 @@ public class ShardConsumerSubscriberTest {
return null;
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();
synchronized (processedNotifier) {
executorService.execute(() -> {
@ -290,7 +274,7 @@ public class ShardConsumerSubscriberTest {
//
// Wait for our blocking thread to control the thread in the executor.
//
processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}
Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem);
@ -301,7 +285,7 @@ public class ShardConsumerSubscriberTest {
assertThat(subscriber.healthCheck(1), nullValue());
barrier.await(500, TimeUnit.MILLISECONDS);
processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
@ -337,9 +321,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
}
startSubscriptionsAndWait(100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
@ -397,9 +379,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
}
startSubscriptionsAndWait(100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
@ -478,6 +458,17 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
}
private void startSubscriptionsAndWait() throws InterruptedException {
startSubscriptionsAndWait(DEFAULT_NOTIFIER_TIMEOUT);
}
private void startSubscriptionsAndWait(long timeout) throws InterruptedException {
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(timeout);
}
}
private class ResponseItem {
private final RecordsRetrieved recordsRetrieved;
private final Throwable throwable;