Updating tests to have correct behavior
* Throwing exception from RecordFlow.exceptionOccured
This commit is contained in:
parent
7de822fd9c
commit
786831d664
1 changed files with 11 additions and 10 deletions
|
|
@ -1,7 +1,6 @@
|
||||||
package software.amazon.kinesis.retrieval.fanout;
|
package software.amazon.kinesis.retrieval.fanout;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
|
@ -10,7 +9,6 @@ import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
|
@ -201,20 +199,15 @@ public class FanOutRecordsPublisherTest {
|
||||||
|
|
||||||
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
|
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
|
||||||
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
||||||
|
ArgumentCaptor<ProcessRecordsInput> inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
||||||
when(kinesisClient.subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()))
|
|
||||||
.thenThrow(new RuntimeException(ResourceNotFoundException.builder().build()));
|
|
||||||
|
|
||||||
Subscriber<ProcessRecordsInput> subscriber = spy(new Subscriber<ProcessRecordsInput>() {
|
Subscriber<ProcessRecordsInput> subscriber = spy(new Subscriber<ProcessRecordsInput>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(final Subscription subscription) {
|
public void onSubscribe(final Subscription subscription) {
|
||||||
assertThat(subscription, notNullValue());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNext(final ProcessRecordsInput processRecordsInput) {
|
public void onNext(final ProcessRecordsInput processRecordsInput) {
|
||||||
assertThat(processRecordsInput.isAtShardEnd(), equalTo(true));
|
|
||||||
assertThat(processRecordsInput.records().isEmpty(), equalTo(true));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -229,9 +222,17 @@ public class FanOutRecordsPublisherTest {
|
||||||
source.subscribe(subscriber);
|
source.subscribe(subscriber);
|
||||||
|
|
||||||
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
||||||
flowCaptor.getValue().onEventStream(publisher);
|
FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue();
|
||||||
verify(subscriber).onComplete();
|
recordFlow.exceptionOccurred(new RuntimeException(ResourceNotFoundException.builder().build()));
|
||||||
|
|
||||||
|
verify(subscriber).onSubscribe(any());
|
||||||
verify(subscriber, never()).onError(any());
|
verify(subscriber, never()).onError(any());
|
||||||
|
verify(subscriber).onNext(inputCaptor.capture());
|
||||||
|
verify(subscriber).onComplete();
|
||||||
|
|
||||||
|
ProcessRecordsInput input = inputCaptor.getValue();
|
||||||
|
assertThat(input.isAtShardEnd(), equalTo(true));
|
||||||
|
assertThat(input.records().isEmpty(), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Record makeRecord(int sequenceNumber) {
|
private Record makeRecord(int sequenceNumber) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue