From a893da6942b372096629f81efedd8a3af17df06b Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 19 Sep 2018 22:45:40 +0530 Subject: [PATCH] Graceful handling of ReadTimeoutExceptions (#403) * Handling ReadTimeouts gracefully * Emitting logging messages at DEBUG level for retryable exceptions * Introducing SubscribeToShardRetryableException * Addressing comments * Making private ThrowableCategory class static * Creating static instances for acquiretimeout and readtimeout categories * Cleaned up imports * Renamed and moved SubscribeToShardRetryableException to RetryableRetrievalException * Renamed UNKNOWN exception type to Other --- .../kinesis/lifecycle/ShardConsumer.java | 8 ++- .../RetryableRetrievalException.java | 31 +++++++++ .../fanout/FanOutRecordsPublisher.java | 64 +++++++++++++++---- .../fanout/FanOutRecordsPublisherTest.java | 24 ++++++- 4 files changed, 112 insertions(+), 15 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetryableRetrievalException.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 134f30d4..8be5ec82 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -43,6 +43,7 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; /** * Responsible for consuming data records of a (specified) shard. @@ -241,7 +242,12 @@ public class ShardConsumer { return null; } if (failure != null) { - log.warn("{}: Failure occurred in retrieval. Restarting data requests", shardInfo.shardId(), failure); + String logMessage = String.format("%s: Failure occurred in retrieval. Restarting data requests", shardInfo.shardId()); + if (failure instanceof RetryableRetrievalException) { + log.debug(logMessage, failure.getCause()); + } else { + log.warn(logMessage, failure); + } startSubscriptions(); return failure; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetryableRetrievalException.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetryableRetrievalException.java new file mode 100644 index 00000000..cd36aa33 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetryableRetrievalException.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import software.amazon.kinesis.exceptions.KinesisClientLibRetryableException; + +/** + * RetryableException for SubscribeToShard APIs. + */ +public class RetryableRetrievalException extends KinesisClientLibRetryableException { + public RetryableRetrievalException(final String message) { + super(message); + } + + public RetryableRetrievalException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 5ed51710..f6f2925d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.async.SdkPublisher; @@ -36,6 +37,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.annotations.KinesisClientExperimental; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; @@ -43,12 +45,15 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; -import software.amazon.kinesis.annotations.KinesisClientExperimental; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Slf4j @KinesisClientInternalApi public class FanOutRecordsPublisher implements RecordsPublisher { + private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( + ThrowableType.ACUIRE_TIMEOUT); + private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); private final KinesisAsyncClient kinesis; private final String shardId; @@ -141,19 +146,29 @@ public class FanOutRecordsPublisher implements RecordsPublisher { shardId, flow.connectionStartedAt, flow.subscribeToShardId); return; } - String category = throwableCategory(t); + Throwable propagationThrowable = t; + ThrowableCategory category = throwableCategory(propagationThrowable); if (isActiveFlow(triggeringFlow)) { if (flow != null) { - log.warn("{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {}", - shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t); + String logMessage = String.format( + "%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s", + shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString); + if (category.throwableType.equals(ThrowableType.READ_TIMEOUT)) { + log.debug(logMessage, propagationThrowable); + propagationThrowable = new RetryableRetrievalException(category.throwableTypeString, + (Exception) propagationThrowable.getCause()); + } else { + log.warn(logMessage, propagationThrowable); + } + flow.cancel(); } log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); availableQueueSpace = 0; try { - handleFlowError(t); + handleFlowError(propagationThrowable); } catch (Throwable innerThrowable) { log.warn("{}: Exception while calling subscriber.onError", shardId, innerThrowable); } @@ -163,7 +178,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { if (triggeringFlow != null) { log.debug( "{}: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ {} id: {} -- {} -> triggeringFlow wasn't the active flow. Didn't dispatch error", - shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, category); + shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId, + category.throwableTypeString); triggeringFlow.cancel(); } } @@ -184,29 +200,55 @@ public class FanOutRecordsPublisher implements RecordsPublisher { } } - private String throwableCategory(Throwable t) { + private enum ThrowableType { + ACUIRE_TIMEOUT("AcquireTimeout"), READ_TIMEOUT("ReadTimeout"), OTHER("Other"); + + String value; + + ThrowableType(final String value) { + this.value = value; + } + } + + private static class ThrowableCategory { + @NonNull + final ThrowableType throwableType; + @NonNull + final String throwableTypeString; + + ThrowableCategory(final ThrowableType throwableType) { + this(throwableType, throwableType.value); + } + + ThrowableCategory(final ThrowableType throwableType, final String throwableTypeString) { + this.throwableType = throwableType; + this.throwableTypeString = throwableTypeString; + } + } + + private ThrowableCategory throwableCategory(Throwable t) { Throwable current = t; StringBuilder builder = new StringBuilder(); do { if (current.getMessage() != null && current.getMessage().startsWith("Acquire operation")) { - return "AcquireTimeout"; + return ACQUIRE_TIMEOUT_CATEGORY; } if (current.getClass().getName().equals("io.netty.handler.timeout.ReadTimeoutException")) { - return "ReadTimeout"; + return READ_TIMEOUT_CATEGORY; } if (current.getCause() == null) { // // At the bottom // - builder.append(current.getClass().getName() + ": " + current.getMessage()); + builder.append(current.getClass().getName()).append(": ").append(current.getMessage()); } else { builder.append(current.getClass().getSimpleName()); builder.append("/"); } current = current.getCause(); } while (current != null); - return builder.toString(); + return new ThrowableCategory(ThrowableType.OTHER, builder.toString()); } private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent recordBatchEvent) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 26421add..823d877b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -1,8 +1,6 @@ package software.amazon.kinesis.retrieval.fanout; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -21,6 +19,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import io.netty.handler.timeout.ReadTimeoutException; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -47,6 +46,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @RunWith(MockitoJUnitRunner.class) @@ -55,7 +55,6 @@ public class FanOutRecordsPublisherTest { private static final String SHARD_ID = "shardId-000000000001"; private static final String CONSUMER_ARN = "arn:consumer"; - private static final boolean VALIDATE_RECORD_SHARD_MATCHING = true; @Mock private KinesisAsyncClient kinesisClient; @@ -226,6 +225,25 @@ public class FanOutRecordsPublisherTest { assertThat(input.records().isEmpty(), equalTo(true)); } + @Test + public void testReadTimeoutExceptionForShard() { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + source.subscribe(subscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + FanOutRecordsPublisher.RecordFlow recordFlow = flowCaptor.getValue(); + recordFlow.exceptionOccurred(new RuntimeException(ReadTimeoutException.INSTANCE)); + + verify(subscriber).onSubscribe(any()); + verify(subscriber).onError(any(RetryableRetrievalException.class)); + verify(subscriber, never()).onNext(any()); + verify(subscriber, never()).onComplete(); + } + @Test public void testContinuesAfterSequence() { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);