Debug remote unit test failure

This commit is contained in:
Chunxue Yang 2019-11-07 13:37:45 -08:00
parent 6690812aac
commit 653a25456c
2 changed files with 8 additions and 1 deletions

View file

@ -80,6 +80,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
lastRequestTime = Instant.now(); lastRequestTime = Instant.now();
if (lastAccepted != null) { if (lastAccepted != null) {
recordsPublisher.restartFrom(lastAccepted); recordsPublisher.restartFrom(lastAccepted);
log.warn("Last record was accepted! Record Publisher restarted from the last accepted record.");
} }
Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize) Flowable.fromPublisher(recordsPublisher).subscribeOn(scheduler).observeOn(scheduler, true, bufferSize)
.subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher)); .subscribe(new ShardConsumerNotifyingSubscriber(this, recordsPublisher));
@ -90,6 +91,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
Throwable result = restartIfFailed(); Throwable result = restartIfFailed();
if (result == null) { if (result == null) {
restartIfRequestTimerExpired(maxTimeBetweenRequests); restartIfRequestTimerExpired(maxTimeBetweenRequests);
log.warn("healthCheck result is null, called restartIfRequestTimerExpired");
} }
return result; return result;
} }
@ -109,7 +111,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests",
shardConsumer.shardInfo().shardId()); shardConsumer.shardInfo().shardId());
if (retrievalFailure instanceof RetryableRetrievalException) { if (retrievalFailure instanceof RetryableRetrievalException) {
log.debug(logMessage, retrievalFailure.getCause()); log.info(logMessage, retrievalFailure.getCause());
} else { } else {
log.warn(logMessage, retrievalFailure); log.warn(logMessage, retrievalFailure);
} }
@ -135,6 +137,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
// Start the subscription again which will update the lastRequestTime as well. // Start the subscription again which will update the lastRequestTime as well.
startSubscriptions(); startSubscriptions();
log.info("No responses. Called previous subscription and called startSubscriptions.");
} }
} }
} }

View file

@ -325,6 +325,7 @@ public class ShardConsumerSubscriberTest {
subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0); subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
addUniqueItem(1); addUniqueItem(1);
log.info("recordsPublisher and subscriber is created successfully.");
List<ProcessRecordsInput> received = new ArrayList<>(); List<ProcessRecordsInput> received = new ArrayList<>();
doAnswer(a -> { doAnswer(a -> {
ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class); ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class);
@ -346,6 +347,7 @@ public class ShardConsumerSubscriberTest {
// subscription has not started correctly. // subscription has not started correctly.
verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
any(Subscription.class)); any(Subscription.class));
log.info("Verified no records were sent back and subscription has not started correctly");
Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem); Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);
@ -353,6 +355,7 @@ public class ShardConsumerSubscriberTest {
// Doing the health check to allow the subscription to restart. // Doing the health check to allow the subscription to restart.
assertThat(subscriber.healthCheck(1), nullValue()); assertThat(subscriber.healthCheck(1), nullValue());
log.info("Health check was successful without exceptions");
// Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the // Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the
// terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines. // terminal record. Keeping the timeout pretty high for avoiding test failures on slow machines.
@ -363,6 +366,7 @@ public class ShardConsumerSubscriberTest {
// Verify that shardConsumer mock was called 100 times and all 100 input records are processed. // Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
any(Subscription.class)); any(Subscription.class));
log.info("Verified that handleInput was called 100 times");
// Verify that received records in the subscriber are equal to the ones sent by the record publisher. // Verify that received records in the subscriber are equal to the ones sent by the record publisher.
assertThat(received.size(), equalTo(recordsPublisher.responses.size())); assertThat(received.size(), equalTo(recordsPublisher.responses.size()));