Refactor duplicate subscribe and wait code

This commit is contained in:
furq-aws 2023-03-23 15:40:49 -07:00
parent c0895e5647
commit ee1f1c50c7

View file

@ -125,10 +125,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(1);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
@ -139,10 +136,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(recordsPublisher.responses.size());
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
@ -171,10 +165,7 @@ public class ShardConsumerSubscriberTest {
}
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
@ -192,10 +183,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
@ -220,10 +208,7 @@ public class ShardConsumerSubscriberTest {
setupNotifierAnswer(10);
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
@ -267,10 +252,7 @@ public class ShardConsumerSubscriberTest {
return null;
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait(subscriber, 5000);
synchronized (processedNotifier) {
executorService.execute(() -> {
@ -337,10 +319,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(100);
}
startSubscriptionsAndWait(subscriber, 100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
@ -398,10 +377,7 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(100);
}
startSubscriptionsAndWait(subscriber, 100);
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
@ -480,6 +456,13 @@ public class ShardConsumerSubscriberTest {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
}
private void startSubscriptionsAndWait(ShardConsumerSubscriber subscriber, long timeout) throws InterruptedException {
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(timeout);
}
}
private class ResponseItem {
private final RecordsRetrieved recordsRetrieved;
private final Throwable throwable;