From c197d35aabe61cc82144b4b8f71ff19e15f771f8 Mon Sep 17 00:00:00 2001 From: ashwing Date: Thu, 19 Sep 2019 16:53:01 -0700 Subject: [PATCH] Making test cases resilient to delayed thread operations (#612) * Making test cases resilient to delayed thread operations * Setting the initial demand in test cases to be in line with service's coral initial demand. --- .../fanout/FanOutRecordsPublisherTest.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 6f5cf4b2..21d751e8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -19,6 +19,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.SdkBytes; @@ -409,6 +410,16 @@ public class FanOutRecordsPublisherTest { @Test public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + + CountDownLatch onS2SCallLatch = new CountDownLatch(2); + + doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { + onS2SCallLatch.countDown(); + return null; + } + }).when(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), any()); + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); ArgumentCaptor captor = ArgumentCaptor @@ -427,7 +438,7 @@ public class FanOutRecordsPublisherTest { CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); int totalServicePublisherEvents = 1000; - int initialDemand = 10; + int initialDemand = 9; int triggerCompleteAtNthEvent = 200; BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, @@ -501,6 +512,8 @@ public class FanOutRecordsPublisherTest { assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + "")); // In non-shard end cases, upon successful completion, the publisher would re-subscribe to service. + // Let's wait for sometime to allow the publisher to re-subscribe + onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS); verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); } @@ -531,8 +544,9 @@ public class FanOutRecordsPublisherTest { .build()); CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + CountDownLatch onCompleteLatch = new CountDownLatch(1); int totalServicePublisherEvents = 1000; - int initialDemand = 10; + int initialDemand = 9; int triggerCompleteAtNthEvent = 200; BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, @@ -578,6 +592,7 @@ public class FanOutRecordsPublisherTest { @Override public void onComplete() { isOnCompleteTriggered[0] = true; + onCompleteLatch.countDown(); } }, source); @@ -610,6 +625,7 @@ public class FanOutRecordsPublisherTest { assertNull(source.getCurrentSequenceNumber()); // With shard end event, onComplete must be propagated to the subscriber. + onCompleteLatch.await(5000, TimeUnit.MILLISECONDS); assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]); } @@ -633,8 +649,9 @@ public class FanOutRecordsPublisherTest { .build()); CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + CountDownLatch onErrorReceiveLatch = new CountDownLatch(1); int totalServicePublisherEvents = 1000; - int initialDemand = 10; + int initialDemand = 9; int triggerErrorAtNthEvent = 241; BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, @@ -675,6 +692,7 @@ public class FanOutRecordsPublisherTest { @Override public void onError(Throwable t) { log.error("Caught throwable in subscriber", t); isOnErrorThrown[0] = true; + onErrorReceiveLatch.countDown(); } @Override public void onComplete() { @@ -709,6 +727,7 @@ public class FanOutRecordsPublisherTest { }); assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + "")); + onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS); assertTrue("OnError should have been thrown", isOnErrorThrown[0]); }