From c7c56d5582dbfeef74cb0e75a6f9c13fcc9179d8 Mon Sep 17 00:00:00 2001 From: Yatin Date: Wed, 9 Dec 2020 13:28:38 -0800 Subject: [PATCH] Backing off everytime we get throttling exception from Kinesis. --- .../kinesis/retrieval/polling/PrefetchRecordsPublisher.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 54b2d97c..9373aa42 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; @@ -460,6 +461,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); publisherSession.dataFetcher().restartIterator(); + } catch (ProvisionedThroughputExceededException e) { + // Update the lastSuccessfulCall if we get a throttling exception so that we back off idleMillis + // for the next call + lastSuccessfulCall = Instant.now(); + log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); } catch (SdkException e) { log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e); } catch (Throwable e) {