From 1280325c20446d269069430f13e80f71212a7e5d Mon Sep 17 00:00:00 2001 From: vincentvilo-aws <142546855+vincentvilo-aws@users.noreply.github.com> Date: Fri, 8 Mar 2024 13:37:25 -0800 Subject: [PATCH] add functionality to retry an InvalidArgumentException (#1270) * add functionality to retry an InvalidArgumentException with a new iterator * include DEFAULT_MAX_RECORDS value in IllegalArgumentException messages --- .../coordinator/KinesisClientLibConfiguration.java | 4 ++++ .../kinesis/retrieval/polling/PollingConfig.java | 12 +++++++++++- .../retrieval/polling/PrefetchRecordsPublisher.java | 5 ++++- .../retrieval/polling/PollingConfigTest.java | 5 +++++ .../polling/PrefetchRecordsPublisherTest.java | 13 +++++++++++++ 5 files changed, 37 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java index 54797050..bc5d2ac9 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java @@ -977,6 +977,10 @@ public class KinesisClientLibConfiguration { */ public KinesisClientLibConfiguration withMaxRecords(int maxRecords) { checkIsValuePositive("MaxRecords", (long) maxRecords); + if (maxRecords > DEFAULT_MAX_RECORDS) { + throw new IllegalArgumentException( + "maxRecords must be less than or equal to " + DEFAULT_MAX_RECORDS + " but current value is " + maxRecords); + } this.maxRecords = maxRecords; return this; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index fed3d15f..3359c570 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -42,6 +42,8 @@ public class PollingConfig implements RetrievalSpecificConfig { public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + public static final int DEFAULT_MAX_RECORDS = 10000; + /** * Configurable functional interface to override the existing DataFetcher. */ @@ -73,7 +75,7 @@ public class PollingConfig implements RetrievalSpecificConfig { * Default value: 10000 *

*/ - private int maxRecords = 10000; + private int maxRecords = DEFAULT_MAX_RECORDS; /** * @param streamName Name of Kinesis stream. @@ -144,6 +146,14 @@ public class PollingConfig implements RetrievalSpecificConfig { return this; } + public void maxRecords(int maxRecords) { + if (maxRecords > DEFAULT_MAX_RECORDS) { + throw new IllegalArgumentException( + "maxRecords must be less than or equal to " + DEFAULT_MAX_RECORDS + " but current value is " + maxRecords()); + } + this.maxRecords = maxRecords; + } + /** * The maximum time to wait for a future request from Kinesis to complete */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 1f7267b0..b94c7cf1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -102,7 +103,6 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final PublisherSession publisherSession; private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); private boolean wasReset = false; - private Instant lastEventDeliveryTime = Instant.EPOCH; private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @@ -512,6 +512,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId); + } catch (InvalidArgumentException e) { + log.info("{} : records threw InvalidArgumentException - iterator will be refreshed before retrying", streamAndShardId, e); + publisherSession.dataFetcher().restartIterator(); } catch (ExpiredIteratorException e) { log.info("{} : records threw ExpiredIteratorException - restarting" + " after greatest seqNum passed to customer", streamAndShardId, e); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PollingConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PollingConfigTest.java index 760c6dce..eefba7a4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PollingConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PollingConfigTest.java @@ -44,4 +44,9 @@ public class PollingConfigTest { config.validateState(true); } + @Test(expected = IllegalArgumentException.class) + public void testInvalidRecordLimit() { + config.maxRecords(PollingConfig.DEFAULT_MAX_RECORDS + 1); + } + } \ No newline at end of file diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index aeacab8e..6e3a56cd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -82,6 +82,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.ShardObjectHelper; @@ -451,6 +452,18 @@ public class PrefetchRecordsPublisherTest { assertEquals(records.processRecordsInput().millisBehindLatest(), response.millisBehindLatest()); } + @Test + public void testInvalidArgumentExceptionIsRetried() { + when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)) + .thenThrow(InvalidArgumentException.builder().build()) + .thenReturn(getRecordsResponse); + + getRecordsCache.start(sequenceNumber, initialPosition); + blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300); + + verify(dataFetcher, times(1)).restartIterator(); + } + @Test(timeout = 10000L) public void testNoDeadlockOnFullQueue() { //