diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 840a1614..86d5c2e0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest { private static final String TERMINAL_MARKER = "Terminal"; + private static final long DEFAULT_NOTIFIER_TIMEOUT = 5000; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @Mock @@ -125,7 +127,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(1); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -136,7 +138,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(recordsPublisher.responses.size()); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -165,7 +167,7 @@ public class ShardConsumerSubscriberTest { } }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); @@ -183,7 +185,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(10); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -208,7 +210,7 @@ public class ShardConsumerSubscriberTest { setupNotifierAnswer(10); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -221,7 +223,7 @@ public class ShardConsumerSubscriberTest { synchronized (processedNotifier) { assertThat(subscriber.healthCheck(100000), equalTo(expected)); - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord)); @@ -252,7 +254,7 @@ public class ShardConsumerSubscriberTest { return null; }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - startSubscriptionsAndWait(subscriber, 5000); + startSubscriptionsAndWait(); synchronized (processedNotifier) { executorService.execute(() -> { @@ -272,7 +274,7 @@ public class ShardConsumerSubscriberTest { // // Wait for our blocking thread to control the thread in the executor. // - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem); @@ -283,7 +285,7 @@ public class ShardConsumerSubscriberTest { assertThat(subscriber.healthCheck(1), nullValue()); barrier.await(500, TimeUnit.MILLISECONDS); - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); @@ -319,7 +321,7 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - startSubscriptionsAndWait(subscriber, 100); + startSubscriptionsAndWait(100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -377,7 +379,7 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - startSubscriptionsAndWait(subscriber, 100); + startSubscriptionsAndWait(100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -456,7 +458,11 @@ public class ShardConsumerSubscriberTest { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } - private void startSubscriptionsAndWait(ShardConsumerSubscriber subscriber, long timeout) throws InterruptedException { + private void startSubscriptionsAndWait() throws InterruptedException { + startSubscriptionsAndWait(DEFAULT_NOTIFIER_TIMEOUT); + } + + private void startSubscriptionsAndWait(long timeout) throws InterruptedException { synchronized (processedNotifier) { subscriber.startSubscriptions(); processedNotifier.wait(timeout); @@ -757,7 +763,7 @@ public class ShardConsumerSubscriberTest { /** * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with the default * configuration of 0 - * + * * @throws Exception */ @Test @@ -786,7 +792,7 @@ public class ShardConsumerSubscriberTest { /** * Test to validate the non-timeout warning message from ShardConsumer is not suppressed with 2 ReadTimeouts to * ignore - * + * * @throws Exception */ @Test