From 7de822fd9c8cd9c548967e1a799f911abe9f2336 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 9 Aug 2018 10:25:37 -0700 Subject: [PATCH 1/5] Fixing issue with ResourceNotFound * Calling onNext and onComplete if throwable is of the kind ResourceNotFound. * Adding testing for ResourceNotFound * Updating version to 2.0.1-SNAPSHOT --- amazon-kinesis-client/pom.xml | 2 +- .../fanout/FanOutRecordsPublisher.java | 16 ++++++- .../fanout/FanOutRecordsPublisherTest.java | 44 +++++++++++++++++++ pom.xml | 2 +- 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index b9636052..4deafbae 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -20,7 +20,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.0.0 + 2.0.1-SNAPSHOT amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 774ce917..9a49a3c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.fanout; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -124,9 +126,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { outstandingRequests = 0; try { - subscriber.onError(t); + if (t.getCause() instanceof ResourceNotFoundException) { + log.warn( + "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", + shardId); + subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()) + .isAtShardEnd(true).build()); + subscriber.onComplete(); + } else { + subscriber.onError(t); + } + } catch (Throwable innerThrowable) { - log.warn("{}: Exception while calling subscriber.onError", innerThrowable); + log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } subscriber = null; flow = null; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 60c560a1..0e82126b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,12 +1,16 @@ package software.amazon.kinesis.retrieval.fanout; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.time.Instant; @@ -31,6 +35,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -190,6 +195,45 @@ public class FanOutRecordsPublisherTest { } + @Test + public void testResourceNotFoundForShard() { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + when(kinesisClient.subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture())) + .thenThrow(new RuntimeException(ResourceNotFoundException.builder().build())); + + Subscriber subscriber = spy(new Subscriber() { + @Override + public void onSubscribe(final Subscription subscription) { + assertThat(subscription, notNullValue()); + } + + @Override + public void onNext(final ProcessRecordsInput processRecordsInput) { + assertThat(processRecordsInput.isAtShardEnd(), equalTo(true)); + assertThat(processRecordsInput.records().isEmpty(), equalTo(true)); + } + + @Override + public void onError(final Throwable throwable) { + } + + @Override + public void onComplete() { + } + }); + + source.subscribe(subscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + verify(subscriber).onComplete(); + verify(subscriber, never()).onError(any()); + } + private Record makeRecord(int sequenceNumber) { SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 }); return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now()) diff --git a/pom.xml b/pom.xml index d03ef43c..42b2dba3 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.0.0 + 2.0.1-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. From 786831d664f2f1555e0119fafd369e0c116ade35 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Thu, 9 Aug 2018 11:29:38 -0700 Subject: [PATCH 2/5] Updating tests to have correct behavior * Throwing exception from RecordFlow.exceptionOccured --- .../fanout/FanOutRecordsPublisherTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 0e82126b..ac367a99 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,7 +1,6 @@ package software.amazon.kinesis.retrieval.fanout; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -10,7 +9,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.time.Instant; @@ -201,20 +199,15 @@ public class FanOutRecordsPublisherTest { ArgumentCaptor flowCaptor = ArgumentCaptor .forClass(FanOutRecordsPublisher.RecordFlow.class); - - when(kinesisClient.subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture())) - .thenThrow(new RuntimeException(ResourceNotFoundException.builder().build())); + ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); Subscriber subscriber = spy(new Subscriber() { @Override public void onSubscribe(final Subscription subscription) { - assertThat(subscription, notNullValue()); } @Override public void onNext(final ProcessRecordsInput processRecordsInput) { - assertThat(processRecordsInput.isAtShardEnd(), equalTo(true)); - assertThat(processRecordsInput.records().isEmpty(), equalTo(true)); } @Override @@ -229,9 +222,17 @@ public class FanOutRecordsPublisherTest { source.subscribe(subscriber); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); - flowCaptor.getValue().onEventStream(publisher); - verify(subscriber).onComplete(); + FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue(); + recordFlow.exceptionOccurred(new RuntimeException(ResourceNotFoundException.builder().build())); + + verify(subscriber).onSubscribe(any()); verify(subscriber, never()).onError(any()); + verify(subscriber).onNext(inputCaptor.capture()); + verify(subscriber).onComplete(); + + ProcessRecordsInput input = inputCaptor.getValue(); + assertThat(input.isAtShardEnd(), equalTo(true)); + assertThat(input.records().isEmpty(), equalTo(true)); } private Record makeRecord(int sequenceNumber) { From b396626f7ed643c6259e957a445e6357ab0a3c3a Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 10:18:15 -0700 Subject: [PATCH 3/5] Changes made according to comments: * Creating handleFlowError method * Using mocks instead of unnecessary spies --- .../fanout/FanOutRecordsPublisher.java | 25 +++++++++++-------- .../fanout/FanOutRecordsPublisherTest.java | 20 ++------------- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 9a49a3c5..5803fa16 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -126,17 +126,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { outstandingRequests = 0; try { - if (t.getCause() instanceof ResourceNotFoundException) { - log.warn( - "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", - shardId); - subscriber.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()) - .isAtShardEnd(true).build()); - subscriber.onComplete(); - } else { - subscriber.onError(t); - } - + handleFlowError(t); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } @@ -154,6 +144,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } + private void handleFlowError(Throwable t) { + if (t.getCause() instanceof ResourceNotFoundException) { + log.debug( + "{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.", + shardId); + subscriber + .onNext(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build()); + subscriber.onComplete(); + } else { + subscriber.onError(t); + } + } + private String throwableCategory(Throwable t) { Throwable current = t; StringBuilder builder = new StringBuilder(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index ac367a99..f82329a1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -5,8 +5,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -201,23 +201,7 @@ public class FanOutRecordsPublisherTest { .forClass(FanOutRecordsPublisher.RecordFlow.class); ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); - Subscriber subscriber = spy(new Subscriber() { - @Override - public void onSubscribe(final Subscription subscription) { - } - - @Override - public void onNext(final ProcessRecordsInput processRecordsInput) { - } - - @Override - public void onError(final Throwable throwable) { - } - - @Override - public void onComplete() { - } - }); + Subscriber subscriber = mock(Subscriber.class); source.subscribe(subscriber); From cd7dc1f9b15c0103a5ade2b2d509d75c52e1f92c Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 10:40:28 -0700 Subject: [PATCH 4/5] Updating version to 2.0.1-SNAPSHOT for multilang --- amazon-kinesis-client-multilang/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index 79cc7076..7f60226a 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -19,7 +19,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.0.0 + 2.0.1-SNAPSHOT 4.0.0 From e780421036151d15230b0795be4b369aa2a86745 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Aug 2018 14:27:15 -0700 Subject: [PATCH 5/5] Removing mock and introducing @Mock --- .../kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index f82329a1..ebdd5f40 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -5,7 +5,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -56,6 +55,8 @@ public class FanOutRecordsPublisherTest { private SdkPublisher publisher; @Mock private Subscription subscription; + @Mock + private Subscriber subscriber; private SubscribeToShardEvent batchEvent; @@ -201,8 +202,6 @@ public class FanOutRecordsPublisherTest { .forClass(FanOutRecordsPublisher.RecordFlow.class); ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); - Subscriber subscriber = mock(Subscriber.class); - source.subscribe(subscriber); verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());