From 7de822fd9c8cd9c548967e1a799f911abe9f2336 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 9 Aug 2018 10:25:37 -0700 Subject: [PATCH] Fixing issue with ResourceNotFound * Calling onNext and onComplete if throwable is of the kind ResourceNotFound. * Adding testing for ResourceNotFound * Updating version to 2.0.1-SNAPSHOT --- amazon-kinesis-client/pom.xml | 2 +- .../fanout/FanOutRecordsPublisher.java | 16 ++++++- .../fanout/FanOutRecordsPublisherTest.java | 44 +++++++++++++++++++ pom.xml | 2 +- 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index b9636052..4deafbae 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -20,7 +20,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.0.0 + 2.0.1-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 774ce917..9a49a3c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.fanout; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -124,9 +126,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { outstandingRequests = 0; try { - subscriber.onError(t); + if (t.getCause() instanceof ResourceNotFoundException) { + log.warn( + "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", + shardId); + subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()) + .isAtShardEnd(true).build()); + subscriber.onComplete(); + } else { + subscriber.onError(t); + } + } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError", innerThrowable); + log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } subscriber = null; flow = null; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 60c560a1..0e82126b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,12 +1,16 @@ package software.amazon.kinesis.retrieval.fanout; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.time.Instant; @@ -31,6 +35,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -190,6 +195,45 @@ public class FanOutRecordsPublisherTest { } + @Test + public void testResourceNotFoundForShard() { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + when(kinesisClient.subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture())) + .thenThrow(new RuntimeException(ResourceNotFoundException.builder().build())); + + Subscriber subscriber = spy(new Subscriber() { + @Override + public void onSubscribe(final Subscription subscription) { + assertThat(subscription, notNullValue()); + } + + @Override + public void onNext(final ProcessRecordsInput processRecordsInput) { + assertThat(processRecordsInput.isAtShardEnd(), equalTo(true)); + assertThat(processRecordsInput.records().isEmpty(), equalTo(true)); + } + + @Override + public void onError(final Throwable throwable) { + } + + @Override + public void onComplete() { + } + }); + + source.subscribe(subscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + verify(subscriber).onComplete(); + verify(subscriber, never()).onError(any()); + } + private Record makeRecord(int sequenceNumber) { SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 }); return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now()) diff --git a/pom.xml b/pom.xml index d03ef43c..42b2dba3 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.0.0 + 2.0.1-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.