From 14915fc01c82b17f44e7f4fc5f3e87f14316c906 Mon Sep 17 00:00:00 2001 From: Kirill Kozlov Date: Wed, 6 May 2020 11:56:19 +0200 Subject: [PATCH] Throttle error messages when Kinesis provisioned throughput exceeded --- .../kinesis/lifecycle/ConsumerStates.java | 2 -- .../amazon/kinesis/lifecycle/ProcessTask.java | 4 --- .../polling/PrefetchRecordsPublisher.java | 10 ++++++- .../polling/SimpleRecordsFetcherFactory.java | 4 ++- ...ynchronousPrefetchingRetrievalFactory.java | 4 ++- .../kinesis/lifecycle/ProcessTaskTest.java | 8 ++--- ...efetchRecordsPublisherIntegrationTest.java | 7 +++-- .../polling/PrefetchRecordsPublisherTest.java | 29 +++++++++++++++++-- 8 files changed, 49 insertions(+), 19 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index bb1788b2..373d3420 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -258,14 +258,12 @@ class ConsumerStates { @Override public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) { - ThrottlingReporter throttlingReporter = new ThrottlingReporter(5, argument.shardInfo().shardId()); return new ProcessTask(argument.shardInfo(), argument.shardRecordProcessor(), argument.recordProcessorCheckpointer(), argument.taskBackoffTimeMillis(), argument.skipShardSyncAtWorkerInitializationIfLeasesExist(), argument.shardDetector(), - throttlingReporter, input, argument.shouldCallProcessRecordsEvenForEmptyRecordList(), argument.idleTimeInMilliseconds(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 6c223650..ea19e587 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -54,7 +54,6 @@ public class ProcessTask implements ConsumerTask { private final TaskType taskType = TaskType.PROCESS; private final long backoffTimeMillis; private final Shard shard; - private final ThrottlingReporter throttlingReporter; private final boolean shouldCallProcessRecordsEvenForEmptyRecordList; private final long idleTimeInMilliseconds; private final ProcessRecordsInput processRecordsInput; @@ -67,7 +66,6 @@ public class ProcessTask implements ConsumerTask { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardDetector shardDetector, - @NonNull ThrottlingReporter throttlingReporter, ProcessRecordsInput processRecordsInput, boolean shouldCallProcessRecordsEvenForEmptyRecordList, long idleTimeInMilliseconds, @@ -77,7 +75,6 @@ public class ProcessTask implements ConsumerTask { this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.backoffTimeMillis = backoffTimeMillis; - this.throttlingReporter = throttlingReporter; this.processRecordsInput = processRecordsInput; this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList; this.idleTimeInMilliseconds = idleTimeInMilliseconds; @@ -125,7 +122,6 @@ public class ProcessTask implements ConsumerTask { return new TaskResult(null, true); } - throttlingReporter.success(); List records = deaggregateAnyKplRecords(processRecordsInput.records()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index dcd5e043..6fcb447f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -44,6 +44,7 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; @@ -60,6 +61,7 @@ import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; +import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; @@ -100,6 +102,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private Instant lastEventDeliveryTime = Instant.EPOCH; private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); + private final ThrottlingReporter throttlingReporter; @Data @Accessors(fluent = true) @@ -208,7 +211,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { final long idleMillisBetweenCalls, @NonNull final MetricsFactory metricsFactory, @NonNull final String operation, - @NonNull final String shardId) { + @NonNull final String shardId, + @NonNull final ThrottlingReporter throttlingReporter) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -223,6 +227,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; this.shardId = shardId; + this.throttlingReporter = throttlingReporter; } @Override @@ -444,6 +449,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); addArrivedRecordsInput(recordsRetrieved); drainQueueForRequests(); + throttlingReporter.success(); } catch (PositionResetException pse) { throw pse; } catch (RetryableRetrievalException rre) { @@ -457,6 +463,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); publisherSession.dataFetcher().restartIterator(); + } catch (ProvisionedThroughputExceededException e) { + throttlingReporter.throttled(); } catch (SdkException e) { log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); } catch (Throwable e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java index a74e3f31..a8be17fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java @@ -25,6 +25,7 @@ import software.amazon.kinesis.retrieval.DataFetchingStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; +import software.amazon.kinesis.retrieval.ThrottlingReporter; @Slf4j @KinesisClientInternalApi @@ -45,7 +46,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { .newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("prefetch-cache-" + shardId + "-%04d").build()), - idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId); + idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId, + new ThrottlingReporter(5, shardId)); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java index 320fe4dd..926f7132 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java @@ -27,6 +27,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; +import software.amazon.kinesis.retrieval.ThrottlingReporter; /** * @@ -78,6 +79,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, - metricsFactory, "Prefetching", shardInfo.shardId()); + metricsFactory, "Prefetching", shardInfo.shardId(), + new ThrottlingReporter(5, shardInfo.shardId())); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 16e6426a..5238256a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -99,8 +99,6 @@ public class ProcessTaskTest { private ShardRecordProcessor shardRecordProcessor; @Mock private ShardRecordProcessorCheckpointer checkpointer; - @Mock - private ThrottlingReporter throttlingReporter; private ProcessTask processTask; @@ -120,8 +118,8 @@ public class ProcessTaskTest { private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, boolean skipShardSync) { return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis, - skipShardSync, shardDetector, throttlingReporter, - processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, + skipShardSync, shardDetector, processRecordsInput, + shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, aggregatorUtil, new NullMetricsFactory()); } @@ -498,8 +496,6 @@ public class ProcessTaskTest { when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue); when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); - verify(throttlingReporter).success(); - verify(throttlingReporter, never()).throttled(); ArgumentCaptor recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(shardRecordProcessor).processRecords(recordsCaptor.capture()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index f940faf2..8ddd7208 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -63,6 +63,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsRetrieved; +import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -115,7 +116,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard"); + "test-shard", + new ThrottlingReporter(5, "test-shard")); } @Test @@ -168,7 +170,8 @@ public class PrefetchRecordsPublisherIntegrationTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "test-shard-2"); + "test-shard-2", + new ThrottlingReporter(5, "test-shard-2")); getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index f5772aaf..76480082 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -28,6 +28,7 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; @@ -60,7 +61,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InOrder; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -73,9 +76,11 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; @@ -84,6 +89,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; +import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -107,6 +113,8 @@ public class PrefetchRecordsPublisherTest { private InitialPositionInStreamExtended initialPosition; @Mock private ExtendedSequenceNumber sequenceNumber; + @Mock + private ThrottlingReporter throttlingReporter; private List records; private ExecutorService executorService; @@ -115,7 +123,6 @@ public class PrefetchRecordsPublisherTest { private String operation = "ProcessTask"; private GetRecordsResponse getRecordsResponse; private Record record; - private RequestDetails requestDetails; @Before public void setup() { @@ -132,7 +139,8 @@ public class PrefetchRecordsPublisherTest { IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), operation, - "shardId"); + "shardId", + throttlingReporter); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); getRecordsResponse = GetRecordsResponse.builder().records(records).build(); @@ -499,6 +507,23 @@ public class PrefetchRecordsPublisherTest { } + @Test + public void testProvisionedThroughputExceededExceptionIsRegisteredInReporter() { + GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build(); + ProvisionedThroughputExceededException throughputExceededException = + ProvisionedThroughputExceededException.builder().build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(throughputExceededException).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + RecordsRetrieved records = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); + InOrder inOrder = Mockito.inOrder(throttlingReporter); + inOrder.verify(throttlingReporter).throttled(); + inOrder.verify(throttlingReporter, atLeastOnce()).success(); + inOrder.verifyNoMoreInteractions(); + assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); + } + private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) { return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId); }