Throttle error messages when Kinesis provisioned throughput exceeded

This commit is contained in:
Kirill Kozlov 2020-05-06 11:56:19 +02:00
parent 8873b1346f
commit 14915fc01c
8 changed files with 49 additions and 19 deletions

View file

@ -258,14 +258,12 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ThrottlingReporter throttlingReporter = new ThrottlingReporter(5, argument.shardInfo().shardId());
return new ProcessTask(argument.shardInfo(),
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
argument.taskBackoffTimeMillis(),
argument.skipShardSyncAtWorkerInitializationIfLeasesExist(),
argument.shardDetector(),
throttlingReporter,
input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),

View file

@ -54,7 +54,6 @@ public class ProcessTask implements ConsumerTask {
private final TaskType taskType = TaskType.PROCESS;
private final long backoffTimeMillis;
private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final boolean shouldCallProcessRecordsEvenForEmptyRecordList;
private final long idleTimeInMilliseconds;
private final ProcessRecordsInput processRecordsInput;
@ -67,7 +66,6 @@ public class ProcessTask implements ConsumerTask {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardDetector shardDetector,
@NonNull ThrottlingReporter throttlingReporter,
ProcessRecordsInput processRecordsInput,
boolean shouldCallProcessRecordsEvenForEmptyRecordList,
long idleTimeInMilliseconds,
@ -77,7 +75,6 @@ public class ProcessTask implements ConsumerTask {
this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
this.processRecordsInput = processRecordsInput;
this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
@ -125,7 +122,6 @@ public class ProcessTask implements ConsumerTask {
return new TaskResult(null, true);
}
throttlingReporter.success();
List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records());

View file

@ -44,6 +44,7 @@ import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
@ -60,6 +61,7 @@ import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
@ -100,6 +102,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
private final ThrottlingReporter throttlingReporter;
@Data
@Accessors(fluent = true)
@ -208,7 +211,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
final long idleMillisBetweenCalls,
@NonNull final MetricsFactory metricsFactory,
@NonNull final String operation,
@NonNull final String shardId) {
@NonNull final String shardId,
@NonNull final ThrottlingReporter throttlingReporter) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
@ -223,6 +227,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.shardId = shardId;
this.throttlingReporter = throttlingReporter;
}
@Override
@ -444,6 +449,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
throttlingReporter.success();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
@ -457,6 +463,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
publisherSession.dataFetcher().restartIterator();
} catch (ProvisionedThroughputExceededException e) {
throttlingReporter.throttled();
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) {

View file

@ -25,6 +25,7 @@ import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
@Slf4j
@KinesisClientInternalApi
@ -45,7 +46,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d").build()),
idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId);
idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId,
new ThrottlingReporter(5, shardId));
}

View file

@ -27,6 +27,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
/**
*
@ -78,6 +79,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
metricsFactory, "Prefetching", shardInfo.shardId());
metricsFactory, "Prefetching", shardInfo.shardId(),
new ThrottlingReporter(5, shardInfo.shardId()));
}
}

View file

@ -99,8 +99,6 @@ public class ProcessTaskTest {
private ShardRecordProcessor shardRecordProcessor;
@Mock
private ShardRecordProcessorCheckpointer checkpointer;
@Mock
private ThrottlingReporter throttlingReporter;
private ProcessTask processTask;
@ -120,8 +118,8 @@ public class ProcessTaskTest {
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
boolean skipShardSync) {
return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
skipShardSync, shardDetector, throttlingReporter,
processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
skipShardSync, shardDetector, processRecordsInput,
shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
aggregatorUtil, new NullMetricsFactory());
}
@ -498,8 +496,6 @@ public class ProcessTaskTest {
when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue);
when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call();
verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled();
ArgumentCaptor<ProcessRecordsInput> recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(shardRecordProcessor).processRecords(recordsCaptor.capture());

View file

@ -63,6 +63,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -115,7 +116,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"test-shard");
"test-shard",
new ThrottlingReporter(5, "test-shard"));
}
@Test
@ -168,7 +170,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"test-shard-2");
"test-shard-2",
new ThrottlingReporter(5, "test-shard-2"));
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);

View file

@ -28,6 +28,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
@ -60,7 +61,9 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@ -73,9 +76,11 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -84,6 +89,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/**
@ -107,6 +113,8 @@ public class PrefetchRecordsPublisherTest {
private InitialPositionInStreamExtended initialPosition;
@Mock
private ExtendedSequenceNumber sequenceNumber;
@Mock
private ThrottlingReporter throttlingReporter;
private List<Record> records;
private ExecutorService executorService;
@ -115,7 +123,6 @@ public class PrefetchRecordsPublisherTest {
private String operation = "ProcessTask";
private GetRecordsResponse getRecordsResponse;
private Record record;
private RequestDetails requestDetails;
@Before
public void setup() {
@ -132,7 +139,8 @@ public class PrefetchRecordsPublisherTest {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"shardId");
"shardId",
throttlingReporter);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder().records(records).build();
@ -499,6 +507,23 @@ public class PrefetchRecordsPublisherTest {
}
@Test
public void testProvisionedThroughputExceededExceptionIsRegisteredInReporter() {
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build();
ProvisionedThroughputExceededException throughputExceededException =
ProvisionedThroughputExceededException.builder().build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(throughputExceededException).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
RecordsRetrieved records = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
InOrder inOrder = Mockito.inOrder(throttlingReporter);
inOrder.verify(throttlingReporter).throttled();
inOrder.verify(throttlingReporter, atLeastOnce()).success();
inOrder.verifyNoMoreInteractions();
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
}
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
}