From ee1f1c50c7eceebfa376700a76feae96ddcc11c6 Mon Sep 17 00:00:00 2001 From: furq-aws Date: Thu, 23 Mar 2023 15:40:49 -0700 Subject: [PATCH] Refactor duplicate subscribe and wait code --- .../ShardConsumerSubscriberTest.java | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index b628abe6..840a1614 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -125,10 +125,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(1); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -139,10 +136,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(recordsPublisher.responses.size()); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -171,10 +165,7 @@ public class ShardConsumerSubscriberTest { } }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); @@ -192,10 +183,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(10); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -220,10 +208,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(10); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -267,10 +252,7 @@ public class ShardConsumerSubscriberTest { return null; }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(subscriber, 5000); synchronized (processedNotifier) { executorService.execute(() -> { @@ -337,10 +319,7 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(100); - } + startSubscriptionsAndWait(subscriber, 100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -398,10 +377,7 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(100); - } + startSubscriptionsAndWait(subscriber, 100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -480,6 +456,13 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } + private void startSubscriptionsAndWait(ShardConsumerSubscriber subscriber, long timeout) throws InterruptedException { + synchronized (processedNotifier) { + subscriber.startSubscriptions(); + processedNotifier.wait(timeout); + } + } + private class ResponseItem { private final RecordsRetrieved recordsRetrieved; private final Throwable throwable;