Making test cases resilient to delayed thread operations (#612)

* Making test cases resilient to delayed thread operations

* Setting the initial demand in test cases to be in line with service's coral initial demand.
This commit is contained in:
ashwing 2019-09-19 16:53:01 -07:00 committed by Micah Jaffe
parent 3fead19df7
commit c197d35aab

View file

@ -19,6 +19,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
@ -409,6 +410,16 @@ public class FanOutRecordsPublisherTest {
@Test @Test
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {
CountDownLatch onS2SCallLatch = new CountDownLatch(2);
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
onS2SCallLatch.countDown();
return null;
}
}).when(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), any());
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
@ -427,7 +438,7 @@ public class FanOutRecordsPublisherTest {
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000; int totalServicePublisherEvents = 1000;
int initialDemand = 10; int initialDemand = 9;
int triggerCompleteAtNthEvent = 200; int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
@ -501,6 +512,8 @@ public class FanOutRecordsPublisherTest {
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + "")); assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + ""));
// In non-shard end cases, upon successful completion, the publisher would re-subscribe to service. // In non-shard end cases, upon successful completion, the publisher would re-subscribe to service.
// Let's wait for sometime to allow the publisher to re-subscribe
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
} }
@ -531,8 +544,9 @@ public class FanOutRecordsPublisherTest {
.build()); .build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
CountDownLatch onCompleteLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000; int totalServicePublisherEvents = 1000;
int initialDemand = 10; int initialDemand = 9;
int triggerCompleteAtNthEvent = 200; int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
@ -578,6 +592,7 @@ public class FanOutRecordsPublisherTest {
@Override public void onComplete() { @Override public void onComplete() {
isOnCompleteTriggered[0] = true; isOnCompleteTriggered[0] = true;
onCompleteLatch.countDown();
} }
}, source); }, source);
@ -610,6 +625,7 @@ public class FanOutRecordsPublisherTest {
assertNull(source.getCurrentSequenceNumber()); assertNull(source.getCurrentSequenceNumber());
// With shard end event, onComplete must be propagated to the subscriber. // With shard end event, onComplete must be propagated to the subscriber.
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]); assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);
} }
@ -633,8 +649,9 @@ public class FanOutRecordsPublisherTest {
.build()); .build());
CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
CountDownLatch onErrorReceiveLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000; int totalServicePublisherEvents = 1000;
int initialDemand = 10; int initialDemand = 9;
int triggerErrorAtNthEvent = 241; int triggerErrorAtNthEvent = 241;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
@ -675,6 +692,7 @@ public class FanOutRecordsPublisherTest {
@Override public void onError(Throwable t) { @Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t); log.error("Caught throwable in subscriber", t);
isOnErrorThrown[0] = true; isOnErrorThrown[0] = true;
onErrorReceiveLatch.countDown();
} }
@Override public void onComplete() { @Override public void onComplete() {
@ -709,6 +727,7 @@ public class FanOutRecordsPublisherTest {
}); });
assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + "")); assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnError should have been thrown", isOnErrorThrown[0]); assertTrue("OnError should have been thrown", isOnErrorThrown[0]);
} }