diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 9a49a3c5..5803fa16 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -126,17 +126,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { outstandingRequests = 0; try { - if (t.getCause() instanceof ResourceNotFoundException) { - log.warn( - "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", - shardId); - subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()) - .isAtShardEnd(true).build()); - subscriber.onComplete(); - } else { - subscriber.onError(t); - } - + handleFlowError(t); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } @@ -154,6 +144,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + private void handleFlowError(Throwable t) { + if (t.getCause() instanceof ResourceNotFoundException) { + log.debug( + "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", + shardId); + subscriber + .onNext(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build()); + subscriber.onComplete(); + } else { + subscriber.onError(t); + } + } + private String throwableCategory(Throwable t) { Throwable current = t; StringBuilder builder = new StringBuilder(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index ac367a99..f82329a1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -5,8 +5,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -201,23 +201,7 @@ public class FanOutRecordsPublisherTest { .forClass(FanOutRecordsPublisher.RecordFlow.class); ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); - Subscriber subscriber = spy(new Subscriber() { - @Override - public void onSubscribe(final Subscription subscription) { - } - - @Override - public void onNext(final ProcessRecordsInput processRecordsInput) { - } - - @Override - public void onError(final Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); + Subscriber subscriber = mock(Subscriber.class); source.subscribe(subscriber);