Changes made according to comments:
* Creating handleFlowError method * Using mocks instead of unnecessary spies
This commit is contained in:
parent
786831d664
commit
b396626f7e
2 changed files with 16 additions and 29 deletions
|
|
@ -126,17 +126,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
outstandingRequests = 0;
|
outstandingRequests = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (t.getCause() instanceof ResourceNotFoundException) {
|
handleFlowError(t);
|
||||||
log.warn(
|
|
||||||
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
|
|
||||||
shardId);
|
|
||||||
subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList())
|
|
||||||
.isAtShardEnd(true).build());
|
|
||||||
subscriber.onComplete();
|
|
||||||
} else {
|
|
||||||
subscriber.onError(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Throwable innerThrowable) {
|
} catch (Throwable innerThrowable) {
|
||||||
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
|
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
|
||||||
}
|
}
|
||||||
|
|
@ -154,6 +144,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleFlowError(Throwable t) {
|
||||||
|
if (t.getCause() instanceof ResourceNotFoundException) {
|
||||||
|
log.debug(
|
||||||
|
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
|
||||||
|
shardId);
|
||||||
|
subscriber
|
||||||
|
.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build());
|
||||||
|
subscriber.onComplete();
|
||||||
|
} else {
|
||||||
|
subscriber.onError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String throwableCategory(Throwable t) {
|
private String throwableCategory(Throwable t) {
|
||||||
Throwable current = t;
|
Throwable current = t;
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@ import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
|
@ -201,23 +201,7 @@ public class FanOutRecordsPublisherTest {
|
||||||
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
||||||
ArgumentCaptor<ProcessRecordsInput> inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
ArgumentCaptor<ProcessRecordsInput> inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
||||||
|
|
||||||
Subscriber<ProcessRecordsInput> subscriber = spy(new Subscriber<ProcessRecordsInput>() {
|
Subscriber<ProcessRecordsInput> subscriber = mock(Subscriber.class);
|
||||||
@Override
|
|
||||||
public void onSubscribe(final Subscription subscription) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(final ProcessRecordsInput processRecordsInput) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(final Throwable throwable) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
source.subscribe(subscriber);
|
source.subscribe(subscriber);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue