Graceful handling of ReadTimeoutExceptions (#403)

* Handling ReadTimeouts gracefully

* Emitting logging messages at DEBUG level for retryable exceptions
* Introducing SubscribeToShardRetryableException

* Addressing comments

* Making private ThrowableCategory class static
* Creating static instances for acquiretimeout and readtimeout categories
* Cleaned up imports
* Renamed and moved SubscribeToShardRetryableException to RetryableRetrievalException
* Renamed UNKNOWN exception type to Other
This commit is contained in:
Sahil Palvia 2018-09-19 22:45:40 +05:30 committed by Justin Pfifer
parent 54c171dc2a
commit a893da6942
4 changed files with 112 additions and 15 deletions

View file

@ -43,6 +43,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
/**
* Responsible for consuming data records of a (specified) shard.
@ -241,7 +242,12 @@ public class ShardConsumer {
return null;
}
if (failure != null) {
log.warn("{}: Failure occurred in retrieval. Restarting data requests", shardInfo.shardId(), failure);
String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardInfo.shardId());
if (failure instanceof RetryableRetrievalException) {
log.debug(logMessage, failure.getCause());
} else {
log.warn(logMessage, failure);
}
startSubscriptions();
return failure;
}

View file

@ -0,0 +1,31 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import software.amazon.kinesis.exceptions.KinesisClientLibRetryableException;
/**
* RetryableException for SubscribeToShard APIs.
*/
public class RetryableRetrievalException extends KinesisClientLibRetryableException {
public RetryableRetrievalException(final String message) {
super(message);
}
public RetryableRetrievalException(final String message, final Exception e) {
super(message, e);
}
}

View file

@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.async.SdkPublisher;
@ -36,6 +37,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
@ -43,12 +45,15 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.annotations.KinesisClientExperimental;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
@KinesisClientInternalApi
public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private final KinesisAsyncClient kinesis;
private final String shardId;
@ -141,19 +146,29 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
return;
}
String category = throwableCategory(t);
Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);
if (isActiveFlow(triggeringFlow)) {
if (flow != null) {
log.warn("{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t);
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString);
if (category.throwableType.equals(ThrowableType.READ_TIMEOUT)) {
log.debug(logMessage, propagationThrowable);
propagationThrowable = new RetryableRetrievalException(category.throwableTypeString,
(Exception) propagationThrowable.getCause());
} else {
log.warn(logMessage, propagationThrowable);
}
flow.cancel();
}
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
availableQueueSpace = 0;
try {
handleFlowError(t);
handleFlowError(propagationThrowable);
} catch (Throwable innerThrowable) {
log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable);
}
@ -163,7 +178,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
if (triggeringFlow != null) {
log.debug(
"{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error",
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, category);
shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId,
category.throwableTypeString);
triggeringFlow.cancel();
}
}
@ -184,29 +200,55 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
}
}
private String throwableCategory(Throwable t) {
private enum ThrowableType {
ACUIRE_TIMEOUT("AcquireTimeout"), READ_TIMEOUT("ReadTimeout"), OTHER("Other");
String value;
ThrowableType(final String value) {
this.value = value;
}
}
private static class ThrowableCategory {
@NonNull
final ThrowableType throwableType;
@NonNull
final String throwableTypeString;
ThrowableCategory(final ThrowableType throwableType) {
this(throwableType, throwableType.value);
}
ThrowableCategory(final ThrowableType throwableType, final String throwableTypeString) {
this.throwableType = throwableType;
this.throwableTypeString = throwableTypeString;
}
}
private ThrowableCategory throwableCategory(Throwable t) {
Throwable current = t;
StringBuilder builder = new StringBuilder();
do {
if (current.getMessage() != null && current.getMessage().startsWith("Acquire operation")) {
return "AcquireTimeout";
return ACQUIRE_TIMEOUT_CATEGORY;
}
if (current.getClass().getName().equals("io.netty.handler.timeout.ReadTimeoutException")) {
return "ReadTimeout";
return READ_TIMEOUT_CATEGORY;
}
if (current.getCause() == null) {
//
// At the bottom
//
builder.append(current.getClass().getName() + ": " + current.getMessage());
builder.append(current.getClass().getName()).append(": ").append(current.getMessage());
} else {
builder.append(current.getClass().getSimpleName());
builder.append("/");
}
current = current.getCause();
} while (current != null);
return builder.toString();
return new ThrowableCategory(ThrowableType.OTHER, builder.toString());
}
private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent recordBatchEvent) {

View file

@ -1,8 +1,6 @@
package software.amazon.kinesis.retrieval.fanout;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -21,6 +19,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.netty.handler.timeout.ReadTimeoutException;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
@ -47,6 +46,7 @@ import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class)
@ -55,7 +55,6 @@ public class FanOutRecordsPublisherTest {
private static final String SHARD_ID = "shardId-000000000001";
private static final String CONSUMER_ARN = "arn:consumer";
private static final boolean VALIDATE_RECORD_SHARD_MATCHING = true;
@Mock
private KinesisAsyncClient kinesisClient;
@ -226,6 +225,25 @@ public class FanOutRecordsPublisherTest {
assertThat(input.records().isEmpty(), equalTo(true));
}
@Test
public void testReadTimeoutExceptionForShard() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);
source.subscribe(subscriber);
verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());
FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue();
recordFlow.exceptionOccurred(new RuntimeException(ReadTimeoutException.INSTANCE));
verify(subscriber).onSubscribe(any());
verify(subscriber).onError(any(RetryableRetrievalException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
@Test
public void testContinuesAfterSequence() {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);