Merge pull request #359 from sahilpalvia/rnf-fix
Fixing issue with ResourceNotFound around SubscribeToShard
This commit is contained in:
commit
9951062a5d
5 changed files with 48 additions and 5 deletions
|
|
@ -19,7 +19,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||||
<groupId>software.amazon.kinesis</groupId>
|
<groupId>software.amazon.kinesis</groupId>
|
||||||
<version>2.0.0</version>
|
<version>2.0.1-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>software.amazon.kinesis</groupId>
|
<groupId>software.amazon.kinesis</groupId>
|
||||||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||||
<version>2.0.0</version>
|
<version>2.0.1-SNAPSHOT</version>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<artifactId>amazon-kinesis-client</artifactId>
|
<artifactId>amazon-kinesis-client</artifactId>
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
package software.amazon.kinesis.retrieval.fanout;
|
package software.amazon.kinesis.retrieval.fanout;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
@ -27,6 +28,7 @@ import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||||
|
|
@ -124,9 +126,9 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
outstandingRequests = 0;
|
outstandingRequests = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
subscriber.onError(t);
|
handleFlowError(t);
|
||||||
} catch (Throwable innerThrowable) {
|
} catch (Throwable innerThrowable) {
|
||||||
log.warn("{}: Exception while calling subscriber.onError", innerThrowable);
|
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
|
||||||
}
|
}
|
||||||
subscriber = null;
|
subscriber = null;
|
||||||
flow = null;
|
flow = null;
|
||||||
|
|
@ -142,6 +144,19 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleFlowError(Throwable t) {
|
||||||
|
if (t.getCause() instanceof ResourceNotFoundException) {
|
||||||
|
log.debug(
|
||||||
|
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
|
||||||
|
shardId);
|
||||||
|
subscriber
|
||||||
|
.onNext(ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build());
|
||||||
|
subscriber.onComplete();
|
||||||
|
} else {
|
||||||
|
subscriber.onError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String throwableCategory(Throwable t) {
|
private String throwableCategory(Throwable t) {
|
||||||
Throwable current = t;
|
Throwable current = t;
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
|
@ -31,6 +32,7 @@ import software.amazon.awssdk.core.SdkBytes;
|
||||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||||
import software.amazon.awssdk.services.kinesis.model.Record;
|
import software.amazon.awssdk.services.kinesis.model.Record;
|
||||||
|
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
|
||||||
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
|
||||||
|
|
@ -53,6 +55,8 @@ public class FanOutRecordsPublisherTest {
|
||||||
private SdkPublisher<SubscribeToShardEventStream> publisher;
|
private SdkPublisher<SubscribeToShardEventStream> publisher;
|
||||||
@Mock
|
@Mock
|
||||||
private Subscription subscription;
|
private Subscription subscription;
|
||||||
|
@Mock
|
||||||
|
private Subscriber<ProcessRecordsInput> subscriber;
|
||||||
|
|
||||||
private SubscribeToShardEvent batchEvent;
|
private SubscribeToShardEvent batchEvent;
|
||||||
|
|
||||||
|
|
@ -190,6 +194,30 @@ public class FanOutRecordsPublisherTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceNotFoundForShard() {
|
||||||
|
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
|
||||||
|
|
||||||
|
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
|
||||||
|
.forClass(FanOutRecordsPublisher.RecordFlow.class);
|
||||||
|
ArgumentCaptor<ProcessRecordsInput> inputCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
||||||
|
|
||||||
|
source.subscribe(subscriber);
|
||||||
|
|
||||||
|
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
|
||||||
|
FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue();
|
||||||
|
recordFlow.exceptionOccurred(new RuntimeException(ResourceNotFoundException.builder().build()));
|
||||||
|
|
||||||
|
verify(subscriber).onSubscribe(any());
|
||||||
|
verify(subscriber, never()).onError(any());
|
||||||
|
verify(subscriber).onNext(inputCaptor.capture());
|
||||||
|
verify(subscriber).onComplete();
|
||||||
|
|
||||||
|
ProcessRecordsInput input = inputCaptor.getValue();
|
||||||
|
assertThat(input.isAtShardEnd(), equalTo(true));
|
||||||
|
assertThat(input.records().isEmpty(), equalTo(true));
|
||||||
|
}
|
||||||
|
|
||||||
private Record makeRecord(int sequenceNumber) {
|
private Record makeRecord(int sequenceNumber) {
|
||||||
SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 });
|
SdkBytes buffer = SdkBytes.fromByteArray(new byte[] { 1, 2, 3 });
|
||||||
return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now())
|
return Record.builder().data(buffer).approximateArrivalTimestamp(Instant.now())
|
||||||
|
|
|
||||||
2
pom.xml
2
pom.xml
|
|
@ -20,7 +20,7 @@
|
||||||
<artifactId>amazon-kinesis-client-pom</artifactId>
|
<artifactId>amazon-kinesis-client-pom</artifactId>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<name>Amazon Kinesis Client Library</name>
|
<name>Amazon Kinesis Client Library</name>
|
||||||
<version>2.0.0</version>
|
<version>2.0.1-SNAPSHOT</version>
|
||||||
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
|
||||||
from Amazon Kinesis.
|
from Amazon Kinesis.
|
||||||
</description>
|
</description>
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue