diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index 79cc7076..7f60226a 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -19,7 +19,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.0.0 + 2.0.1-SNAPSHOT 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index b9636052..4deafbae 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -20,7 +20,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.0.0 + 2.0.1-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 774ce917..5803fa16 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.fanout; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -124,9 +126,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher { outstandingRequests = 0; try { - subscriber.onError(t); + handleFlowError(t); } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError", innerThrowable); + log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } subscriber = null; flow = null; @@ -142,6 +144,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + private void handleFlowError(Throwable t) { + if (t.getCause() instanceof ResourceNotFoundException) { + log.debug( + "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", + shardId); + subscriber + .onNext(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build()); + subscriber.onComplete(); + } else { + subscriber.onError(t); + } + } + private String throwableCategory(Throwable t) { Throwable current = t; StringBuilder builder = new StringBuilder(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 60c560a1..ebdd5f40 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -31,6 +32,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -53,6 +55,8 @@ public class FanOutRecordsPublisherTest { private SdkPublisher publisher; @Mock private Subscription subscription; + @Mock + private Subscriber subscriber; private SubscribeToShardEvent batchEvent; @@ -190,6 +194,30 @@ public class FanOutRecordsPublisherTest { } + @Test + public void testResourceNotFoundForShard() { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); + + source.subscribe(subscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue(); + recordFlow.exceptionOccurred(new RuntimeException(ResourceNotFoundException.builder().build())); + + verify(subscriber).onSubscribe(any()); + verify(subscriber, never()).onError(any()); + verify(subscriber).onNext(inputCaptor.capture()); + verify(subscriber).onComplete(); + + ProcessRecordsInput input = inputCaptor.getValue(); + assertThat(input.isAtShardEnd(), equalTo(true)); + assertThat(input.records().isEmpty(), equalTo(true)); + } + private Record makeRecord(int sequenceNumber) { SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 }); return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now()) diff --git a/pom.xml b/pom.xml index d03ef43c..42b2dba3 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.0.0 + 2.0.1-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.