Fixing issue with ResourceNotFound

* Calling onNext and onComplete if throwable is of the kind ResourceNotFound.
* Adding testing for ResourceNotFound
* Updating version to 2.0.1-SNAPSHOT
This commit is contained in:
Sahil Palvia 2018-08-09 10:25:37 -07:00
parent 0b267037ea
commit 7de822fd9c
4 changed files with 60 additions and 4 deletions

View file

@ -20,7 +20,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.0.0</version>
<version>2.0.1-SNAPSHOT</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval.fanout;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
@ -124,9 +126,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
outstandingRequests = 0;
try {
subscriber.onError(t);
if (t.getCause() instanceof ResourceNotFoundException) {
log.warn(
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
shardId);
subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList())
.isAtShardEnd(true).build());
subscriber.onComplete();
} else {
subscriber.onError(t);
}
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError", innerThrowable);
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
}
subscriber = null;
flow = null;

View file

@ -1,12 +1,16 @@
package software.amazon.kinesis.retrieval.fanout;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.time.Instant;
@ -31,6 +35,7 @@ import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
@ -190,6 +195,45 @@ public class FanOutRecordsPublisherTest {
}
@Test
public void testResourceNotFoundForShard() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
when(kinesisClient.subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()))
.thenThrow(new RuntimeException(ResourceNotFoundException.builder().build()));
Subscriber<ProcessRecordsInput> subscriber = spy(new Subscriber<ProcessRecordsInput>() {
@Override
public void onSubscribe(final Subscription subscription) {
assertThat(subscription, notNullValue());
}
@Override
public void onNext(final ProcessRecordsInput processRecordsInput) {
assertThat(processRecordsInput.isAtShardEnd(), equalTo(true));
assertThat(processRecordsInput.records().isEmpty(), equalTo(true));
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onComplete() {
}
});
source.subscribe(subscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
flowCaptor.getValue().onEventStream(publisher);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any());
}
private Record makeRecord(int sequenceNumber) {
SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 });
return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now())

View file

@ -20,7 +20,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>2.0.0</version>
<version>2.0.1-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>