Fixing unit tests
This commit is contained in:
parent
d940f8e2aa
commit
5221e1cd54
2 changed files with 11 additions and 29 deletions
|
|
@ -80,7 +80,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
||||||
lastRequestTime = Instant.now();
|
lastRequestTime = Instant.now();
|
||||||
if (lastAccepted != null) {
|
if (lastAccepted != null) {
|
||||||
recordsPublisher.restartFrom(lastAccepted);
|
recordsPublisher.restartFrom(lastAccepted);
|
||||||
log.warn("Last record was accepted! Record Publisher restarted from the last accepted record.");
|
|
||||||
}
|
}
|
||||||
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
|
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
|
||||||
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
|
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
|
||||||
|
|
@ -91,7 +90,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
||||||
Throwable result = restartIfFailed();
|
Throwable result = restartIfFailed();
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
restartIfRequestTimerExpired(maxTimeBetweenRequests);
|
restartIfRequestTimerExpired(maxTimeBetweenRequests);
|
||||||
log.warn("healthCheck result is null, called restartIfRequestTimerExpired");
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -111,7 +109,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
||||||
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
|
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
|
||||||
shardConsumer.shardInfo().shardId());
|
shardConsumer.shardInfo().shardId());
|
||||||
if (retrievalFailure instanceof RetryableRetrievalException) {
|
if (retrievalFailure instanceof RetryableRetrievalException) {
|
||||||
log.info(logMessage, retrievalFailure.getCause());
|
log.debug(logMessage, retrievalFailure.getCause());
|
||||||
} else {
|
} else {
|
||||||
log.warn(logMessage, retrievalFailure);
|
log.warn(logMessage, retrievalFailure);
|
||||||
}
|
}
|
||||||
|
|
@ -137,7 +135,6 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
|
||||||
|
|
||||||
// Start the subscription again which will update the lastRequestTime as well.
|
// Start the subscription again which will update the lastRequestTime as well.
|
||||||
startSubscriptions();
|
startSubscriptions();
|
||||||
log.info("No responses. Called previous subscription and called startSubscriptions.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
|
@ -126,7 +125,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
processedNotifier.wait(5000);
|
processedNotifier.wait(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
|
verify(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -140,8 +139,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
processedNotifier.wait(5000);
|
processedNotifier.wait(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -176,8 +174,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
|
assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
|
||||||
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
|
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
|
||||||
|
|
||||||
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -202,8 +199,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(shardConsumer, times(10)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(10)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
assertThat(subscriber.retrievalFailure(), equalTo(expected));
|
assertThat(subscriber.retrievalFailure(), equalTo(expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -239,8 +235,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord));
|
assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord));
|
||||||
verify(shardConsumer, times(20)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(20)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -304,8 +299,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
processedNotifier.wait(5000);
|
processedNotifier.wait(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
|
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
|
||||||
|
|
@ -325,11 +319,9 @@ public class ShardConsumerSubscriberTest {
|
||||||
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
|
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
|
||||||
addUniqueItem(1);
|
addUniqueItem(1);
|
||||||
|
|
||||||
log.info("recordsPublisher and subscriber is created successfully.");
|
|
||||||
List<ProcessRecordsInput> received = new ArrayList<>();
|
List<ProcessRecordsInput> received = new ArrayList<>();
|
||||||
doAnswer(a -> {
|
doAnswer(a -> {
|
||||||
ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class);
|
ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class);
|
||||||
log.info("Handling input for: {}", input.toString());
|
|
||||||
received.add(input);
|
received.add(input);
|
||||||
if (input.records().stream().anyMatch(r -> StringUtils.startsWith(r.partitionKey(), TERMINAL_MARKER))) {
|
if (input.records().stream().anyMatch(r -> StringUtils.startsWith(r.partitionKey(), TERMINAL_MARKER))) {
|
||||||
synchronized (processedNotifier) {
|
synchronized (processedNotifier) {
|
||||||
|
|
@ -346,9 +338,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
|
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
|
||||||
// subscription has not started correctly.
|
// subscription has not started correctly.
|
||||||
verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
log.info("Verified no records were sent back and subscription has not started correctly");
|
|
||||||
|
|
||||||
Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);
|
Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);
|
||||||
|
|
||||||
|
|
@ -356,7 +346,6 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
// Doing the health check to allow the subscription to restart.
|
// Doing the health check to allow the subscription to restart.
|
||||||
assertThat(subscriber.healthCheck(1), nullValue());
|
assertThat(subscriber.healthCheck(1), nullValue());
|
||||||
log.info("Health check was successful without exceptions");
|
|
||||||
|
|
||||||
// Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the
|
// Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the
|
||||||
// terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines.
|
// terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines.
|
||||||
|
|
@ -365,9 +354,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
|
// Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
|
||||||
verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class),
|
verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
log.info("Verified that handleInput was called 100 times");
|
|
||||||
|
|
||||||
// Verify that received records in the subscriber are equal to the ones sent by the record publisher.
|
// Verify that received records in the subscriber are equal to the ones sent by the record publisher.
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
|
|
@ -411,8 +398,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
|
|
||||||
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
|
// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
|
||||||
// subscription has not started correctly.
|
// subscription has not started correctly.
|
||||||
verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, never()).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
|
|
||||||
Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);
|
Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);
|
||||||
|
|
||||||
|
|
@ -428,8 +414,7 @@ public class ShardConsumerSubscriberTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
|
// Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
|
||||||
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
|
verify(shardConsumer, times(100)).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
|
||||||
any(Subscription.class));
|
|
||||||
|
|
||||||
// Verify that received records in the subscriber are equal to the ones sent by the record publisher.
|
// Verify that received records in the subscriber are equal to the ones sent by the record publisher.
|
||||||
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue