Reduce Throttling Error Reports

Only report a throttling error if their are 6 consecutive throttles.
This commit is contained in:
Pfifer, Justin 2017-02-02 13:33:13 -08:00
parent 3d18e484cf
commit cca61706db
3 changed files with 226 additions and 74 deletions

View file

@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
@ -49,6 +50,7 @@ class ProcessTask implements ITask {
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
private static final int MAX_CONSECUTIVE_THROTTLES = 5;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
@ -58,23 +60,49 @@ class ProcessTask implements ITask {
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter;
/** /**
* @param shardInfo contains information about the shard * @param shardInfo
* @param streamConfig Stream configuration * contains information about the shard
* @param recordProcessor Record processor used to process the data records for the shard * @param streamConfig
* @param recordProcessorCheckpointer Passed to the RecordProcessor so it can checkpoint * Stream configuration
* progress * @param recordProcessor
* @param dataFetcher Kinesis data fetcher (used to fetch records from Kinesis) * Record processor used to process the data records for the shard
* @param backoffTimeMillis backoff time when catching exceptions * @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
*/ */
public ProcessTask(ShardInfo shardInfo, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
StreamConfig streamConfig, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
IRecordProcessor recordProcessor, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
RecordProcessorCheckpointer recordProcessorCheckpointer, this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
KinesisDataFetcher dataFetcher, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES));
long backoffTimeMillis, }
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param throttlingReporter
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -82,6 +110,7 @@ class ProcessTask implements ITask {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
@ -104,9 +133,6 @@ class ProcessTask implements ITask {
* *
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity
@SuppressWarnings("unchecked")
@Override @Override
public TaskResult call() { public TaskResult call() {
long startTimeMillis = System.currentTimeMillis(); long startTimeMillis = System.currentTimeMillis();
@ -120,85 +146,146 @@ class ProcessTask implements ITask {
try { try {
if (dataFetcher.isShardEndReached()) { if (dataFetcher.isShardEndReached()) {
LOG.info("Reached end of shard " + shardInfo.getShardId()); LOG.info("Reached end of shard " + shardInfo.getShardId());
boolean shardEndReached = true; return new TaskResult(null, true);
return new TaskResult(null, shardEndReached);
} }
final GetRecordsResult getRecordsResult = getRecordsResult(); final GetRecordsResult getRecordsResult = getRecordsResult();
throttlingReporter.success();
List<Record> records = getRecordsResult.getRecords(); List<Record> records = getRecordsResult.getRecords();
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else { } else {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); handleNoRecords(startTimeMillis);
long sleepTimeMillis =
streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
try {
LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
+ shardInfo.getShardId());
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
}
}
}
// We deaggregate if and only if we got actual Kinesis records, i.e.
// not instances of some subclass thereof.
if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
if (this.shard != null) {
records = (List<Record>) (List<?>) UserRecord.deaggregate(records,
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
} else {
records = (List<Record>) (List<?>) UserRecord.deaggregate(records);
}
} }
records = deaggregateRecords(records);
recordProcessorCheckpointer.setLargestPermittedCheckpointValue( recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
filterAndGetMaxExtendedSequenceNumber(scope, records, filterAndGetMaxExtendedSequenceNumber(scope, records,
recordProcessorCheckpointer.getLastCheckpointValue(), recordProcessorCheckpointer.getLastCheckpointValue(),
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) { if (shouldCallProcessRecords(records)) {
LOG.debug("Calling application processRecords() with " + records.size() callProcessRecords(getRecordsResult, records);
+ " records from " + shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
LOG.error("ShardId " + shardInfo.getShardId()
+ ": Application processRecords() threw an exception when processing shard ", e);
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
+ records);
} finally {
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
}
} }
} catch (ProvisionedThroughputExceededException pte) {
String message = "Shard '" + shardInfo.getShardId() + "' has been throttled "
+ throttlingReporter.getConsecutiveThrottles() + " consecutively";
throttlingReporter.throttled();
if (throttlingReporter.shouldReportError()) {
LOG.error(message);
} else {
LOG.warn(message);
}
exception = pte;
backoff();
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e); LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e);
exception = e; exception = e;
backoff();
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
}
} }
return new TaskResult(exception); return new TaskResult(exception);
} }
// CHECKSTYLE:ON CyclomaticComplexity
/**
* Sleeps for the configured backoff period. This is usually only called when an exception occurs.
*/
private void backoff() {
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie);
}
}
/**
* Dispatches a batch of records to the record processor, and handles any fallout from that.
*
* @param getRecordsResult
* the result of the last call to Kinesis
* @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
*/
private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) {
LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
recordProcessor.processRecords(processRecordsInput);
} catch (Exception e) {
LOG.error("ShardId " + shardInfo.getShardId()
+ ": Application processRecords() threw an exception when processing shard ", e);
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " + records);
} finally {
MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC,
recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
}
}
/**
* Whether we should call process records or not
*
* @param records
* the records returned from the call to Kinesis, and/or deaggregation
* @return true if the set of records should be dispatched to the record process, false if they should not.
*/
private boolean shouldCallProcessRecords(List<Record> records) {
return (!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList();
}
/**
* Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation
*
* @param records
* the records to deaggregate is deaggregation is required.
* @return returns either the deaggregated records, or the original records
*/
@SuppressWarnings("unchecked")
private List<Record> deaggregateRecords(List<Record> records) {
// We deaggregate if and only if we got actual Kinesis records, i.e.
// not instances of some subclass thereof.
if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
if (this.shard != null) {
return (List<Record>) (List<?>) UserRecord.deaggregate(records,
new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()),
new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
} else {
return (List<Record>) (List<?>) UserRecord.deaggregate(records);
}
}
return records;
}
/**
* Emits metrics, and sleeps if there are no records available
*
* @param startTimeMillis
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
long sleepTimeMillis = streamConfig.getIdleTimeInMilliseconds()
- (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds());
try {
LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard "
+ shardInfo.getShardId());
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted");
}
}
}
@Override @Override
public TaskType getTaskType() { public TaskType getTaskType() {

View file

@ -0,0 +1,26 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
class ThrottlingReporter {
private final int maxConsecutiveWarnThrottles;
@Getter
private int consecutiveThrottles = 0;
void throttled() {
consecutiveThrottles++;
}
void success() {
consecutiveThrottles = 0;
}
boolean shouldReportError() {
return consecutiveThrottles > maxConsecutiveWarnThrottles;
}
}

View file

@ -0,0 +1,39 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
public class ThrottlingReporterTest {
@Test
public void testLessThanMaxThrottles() {
ThrottlingReporter reporter = new ThrottlingReporter(5);
assertThat(reporter.shouldReportError(), is(false));
reporter.throttled();
assertThat(reporter.shouldReportError(), is(false));
}
@Test
public void testMoreThanMaxThrottles() {
ThrottlingReporter reporter = new ThrottlingReporter(1);
assertThat(reporter.shouldReportError(), is(false));
reporter.throttled();
reporter.throttled();
assertThat(reporter.shouldReportError(), is(true));
}
@Test
public void testSuccessResetsErrors() {
ThrottlingReporter reporter = new ThrottlingReporter(1);
assertThat(reporter.shouldReportError(), is(false));
reporter.throttled();
reporter.throttled();
assertThat(reporter.shouldReportError(), is(true));
reporter.success();
assertThat(reporter.shouldReportError(), is(false));
}
}