This commit is contained in:
Kirill Kozlov 2020-06-28 13:32:01 +00:00 committed by GitHub
commit 264f1a7006
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 19 deletions

View file

@ -258,14 +258,12 @@ class ConsumerStates {
@Override @Override
public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) { public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ThrottlingReporter throttlingReporter = new ThrottlingReporter(5, argument.shardInfo().shardId());
return new ProcessTask(argument.shardInfo(), return new ProcessTask(argument.shardInfo(),
argument.shardRecordProcessor(), argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(), argument.recordProcessorCheckpointer(),
argument.taskBackoffTimeMillis(), argument.taskBackoffTimeMillis(),
argument.skipShardSyncAtWorkerInitializationIfLeasesExist(), argument.skipShardSyncAtWorkerInitializationIfLeasesExist(),
argument.shardDetector(), argument.shardDetector(),
throttlingReporter,
input, input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(), argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(), argument.idleTimeInMilliseconds(),

View file

@ -54,7 +54,6 @@ public class ProcessTask implements ConsumerTask {
private final TaskType taskType = TaskType.PROCESS; private final TaskType taskType = TaskType.PROCESS;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final boolean shouldCallProcessRecordsEvenForEmptyRecordList; private final boolean shouldCallProcessRecordsEvenForEmptyRecordList;
private final long idleTimeInMilliseconds; private final long idleTimeInMilliseconds;
private final ProcessRecordsInput processRecordsInput; private final ProcessRecordsInput processRecordsInput;
@ -67,7 +66,6 @@ public class ProcessTask implements ConsumerTask {
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardDetector shardDetector, ShardDetector shardDetector,
@NonNull ThrottlingReporter throttlingReporter,
ProcessRecordsInput processRecordsInput, ProcessRecordsInput processRecordsInput,
boolean shouldCallProcessRecordsEvenForEmptyRecordList, boolean shouldCallProcessRecordsEvenForEmptyRecordList,
long idleTimeInMilliseconds, long idleTimeInMilliseconds,
@ -77,7 +75,6 @@ public class ProcessTask implements ConsumerTask {
this.shardRecordProcessor = shardRecordProcessor; this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
this.processRecordsInput = processRecordsInput; this.processRecordsInput = processRecordsInput;
this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList; this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
this.idleTimeInMilliseconds = idleTimeInMilliseconds; this.idleTimeInMilliseconds = idleTimeInMilliseconds;
@ -125,7 +122,6 @@ public class ProcessTask implements ConsumerTask {
return new TaskResult(null, true); return new TaskResult(null, true);
} }
throttlingReporter.success();
List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records()); List<KinesisClientRecord> records = deaggregateAnyKplRecords(processRecordsInput.records());

View file

@ -44,6 +44,7 @@ import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.common.RequestDetails;
@ -60,6 +61,7 @@ import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired; import static software.amazon.kinesis.common.DiagnosticUtils.takeDelayedDeliveryActionIfRequired;
@ -100,6 +102,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private Instant lastEventDeliveryTime = Instant.EPOCH; private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
private final ThrottlingReporter throttlingReporter;
@Data @Data
@Accessors(fluent = true) @Accessors(fluent = true)
@ -208,7 +211,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
final long idleMillisBetweenCalls, final long idleMillisBetweenCalls,
@NonNull final MetricsFactory metricsFactory, @NonNull final MetricsFactory metricsFactory,
@NonNull final String operation, @NonNull final String operation,
@NonNull final String shardId) { @NonNull final String shardId,
@NonNull final ThrottlingReporter throttlingReporter) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall; this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
@ -223,6 +227,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
Validate.notEmpty(operation, "Operation cannot be empty"); Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation; this.operation = operation;
this.shardId = shardId; this.shardId = shardId;
this.throttlingReporter = throttlingReporter;
} }
@Override @Override
@ -444,6 +449,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved); addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests(); drainQueueForRequests();
throttlingReporter.success();
} catch (PositionResetException pse) { } catch (PositionResetException pse) {
throw pse; throw pse;
} catch (RetryableRetrievalException rre) { } catch (RetryableRetrievalException rre) {
@ -457,6 +463,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);
publisherSession.dataFetcher().restartIterator(); publisherSession.dataFetcher().restartIterator();
} catch (ProvisionedThroughputExceededException e) {
throttlingReporter.throttled();
} catch (SdkException e) { } catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) { } catch (Throwable e) {

View file

@ -25,6 +25,7 @@ import software.amazon.kinesis.retrieval.DataFetchingStrategy;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
@ -45,7 +46,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
.newFixedThreadPool(1, .newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true) new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d").build()), .setNameFormat("prefetch-cache-" + shardId + "-%04d").build()),
idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId); idleMillisBetweenCalls, metricsFactory, "ProcessTask", shardId,
new ThrottlingReporter(5, shardId));
} }

View file

@ -27,6 +27,7 @@ import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory; import software.amazon.kinesis.retrieval.RetrievalFactory;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
/** /**
* *
@ -78,6 +79,7 @@ public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
metricsFactory, "Prefetching", shardInfo.shardId()); metricsFactory, "Prefetching", shardInfo.shardId(),
new ThrottlingReporter(5, shardInfo.shardId()));
} }
} }

View file

@ -99,8 +99,6 @@ public class ProcessTaskTest {
private ShardRecordProcessor shardRecordProcessor; private ShardRecordProcessor shardRecordProcessor;
@Mock @Mock
private ShardRecordProcessorCheckpointer checkpointer; private ShardRecordProcessorCheckpointer checkpointer;
@Mock
private ThrottlingReporter throttlingReporter;
private ProcessTask processTask; private ProcessTask processTask;
@ -120,8 +118,8 @@ public class ProcessTaskTest {
private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil, private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput, AggregatorUtil aggregatorUtil,
boolean skipShardSync) { boolean skipShardSync) {
return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis, return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
skipShardSync, shardDetector, throttlingReporter, skipShardSync, shardDetector, processRecordsInput,
processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
aggregatorUtil, new NullMetricsFactory()); aggregatorUtil, new NullMetricsFactory());
} }
@ -498,8 +496,6 @@ public class ProcessTaskTest {
when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue); when(checkpointer.lastCheckpointValue()).thenReturn(lastCheckpointValue);
when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); when(checkpointer.largestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call(); processTask.call();
verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled();
ArgumentCaptor<ProcessRecordsInput> recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); ArgumentCaptor<ProcessRecordsInput> recordsCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(shardRecordProcessor).processRecords(recordsCaptor.capture()); verify(shardRecordProcessor).processRecords(recordsCaptor.capture());

View file

@ -63,6 +63,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
@ -115,7 +116,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"test-shard"); "test-shard",
new ThrottlingReporter(5, "test-shard"));
} }
@Test @Test
@ -168,7 +170,8 @@ public class PrefetchRecordsPublisherIntegrationTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"test-shard-2"); "test-shard-2",
new ThrottlingReporter(5, "test-shard-2"));
getRecordsCache.start(extendedSequenceNumber, initialPosition); getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS); sleep(IDLE_MILLIS_BETWEEN_CALLS);

View file

@ -28,6 +28,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -60,7 +61,9 @@ import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -73,9 +76,11 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory;
@ -84,6 +89,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
/** /**
@ -107,6 +113,8 @@ public class PrefetchRecordsPublisherTest {
private InitialPositionInStreamExtended initialPosition; private InitialPositionInStreamExtended initialPosition;
@Mock @Mock
private ExtendedSequenceNumber sequenceNumber; private ExtendedSequenceNumber sequenceNumber;
@Mock
private ThrottlingReporter throttlingReporter;
private List<Record> records; private List<Record> records;
private ExecutorService executorService; private ExecutorService executorService;
@ -115,7 +123,6 @@ public class PrefetchRecordsPublisherTest {
private String operation = "ProcessTask"; private String operation = "ProcessTask";
private GetRecordsResponse getRecordsResponse; private GetRecordsResponse getRecordsResponse;
private Record record; private Record record;
private RequestDetails requestDetails;
@Before @Before
public void setup() { public void setup() {
@ -132,7 +139,8 @@ public class PrefetchRecordsPublisherTest {
IDLE_MILLIS_BETWEEN_CALLS, IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(), new NullMetricsFactory(),
operation, operation,
"shardId"); "shardId",
throttlingReporter);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>()); records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder().records(records).build(); getRecordsResponse = GetRecordsResponse.builder().records(records).build();
@ -499,6 +507,23 @@ public class PrefetchRecordsPublisherTest {
} }
@Test
public void testProvisionedThroughputExceededExceptionIsRegisteredInReporter() {
GetRecordsResponse response = GetRecordsResponse.builder().millisBehindLatest(100L).records(Collections.emptyList()).build();
ProvisionedThroughputExceededException throughputExceededException =
ProvisionedThroughputExceededException.builder().build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenThrow(throughputExceededException).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
RecordsRetrieved records = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000);
InOrder inOrder = Mockito.inOrder(throttlingReporter);
inOrder.verify(throttlingReporter).throttled();
inOrder.verify(throttlingReporter, atLeastOnce()).success();
inOrder.verifyNoMoreInteractions();
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
}
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) { private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId); return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
} }