From 2967f2a18cb4ed21385f73f91743400d23d799ab Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 17 Feb 2017 09:52:47 -0800 Subject: [PATCH] Reduce Throttling Error Reports (#140) * Reduce Throttling Error Reports Only report a throttling error if their are 6 consecutive throttles. Moved the logging of throttling messages to the throttling reporter. --- .../clientlibrary/lib/worker/ProcessTask.java | 228 ++++++++++++------ .../lib/worker/ThrottlingReporter.java | 38 +++ .../lib/worker/ProcessTaskTest.java | 10 +- .../lib/worker/ThrottlingReporterTest.java | 66 +++++ 4 files changed, 267 insertions(+), 75 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 6ee34880..c419c693 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; @@ -49,6 +50,7 @@ class ProcessTask implements ITask { private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; + private static final int MAX_CONSECUTIVE_THROTTLES = 5; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -58,23 +60,50 @@ class ProcessTask implements ITask { private final StreamConfig streamConfig; private final long backoffTimeMillis; private final Shard shard; + private final ThrottlingReporter throttlingReporter; /** - * @param shardInfo contains information about the shard - * @param streamConfig Stream configuration - * @param recordProcessor Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer Passed to the RecordProcessor so it can checkpoint - * progress - * @param dataFetcher Kinesis data fetcher (used to fetch records from Kinesis) - * @param backoffTimeMillis backoff time when catching exceptions + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress + * @param dataFetcher + * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions */ - public ProcessTask(ShardInfo shardInfo, - StreamConfig streamConfig, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { + this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId())); + } + + /** + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress + * @param dataFetcher + * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions + * @param throttlingReporter + * determines how throttling events should be reported in the log. + */ + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,6 +111,7 @@ class ProcessTask implements ITask { this.dataFetcher = dataFetcher; this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; + this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will @@ -104,9 +134,6 @@ class ProcessTask implements ITask { * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() */ - - // CHECKSTYLE:OFF CyclomaticComplexity - @SuppressWarnings("unchecked") @Override public TaskResult call() { long startTimeMillis = System.currentTimeMillis(); @@ -120,85 +147,138 @@ class ProcessTask implements ITask { try { if (dataFetcher.isShardEndReached()) { LOG.info("Reached end of shard " + shardInfo.getShardId()); - boolean shardEndReached = true; - return new TaskResult(null, shardEndReached); + return new TaskResult(null, true); } final GetRecordsResult getRecordsResult = getRecordsResult(); + throttlingReporter.success(); List records = getRecordsResult.getRecords(); if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { - LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); + handleNoRecords(startTimeMillis); + } + records = deaggregateRecords(records); - long sleepTimeMillis = - streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis); - if (sleepTimeMillis > 0) { - sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds()); - try { - LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " - + shardInfo.getShardId()); - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted"); - } - } - } - - // We deaggregate if and only if we got actual Kinesis records, i.e. - // not instances of some subclass thereof. - if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) { - if (this.shard != null) { - records = (List) (List) UserRecord.deaggregate(records, - new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), - new BigInteger(this.shard.getHashKeyRange().getEndingHashKey())); - } else { - records = (List) (List) UserRecord.deaggregate(records); - } - } - recordProcessorCheckpointer.setLargestPermittedCheckpointValue( filterAndGetMaxExtendedSequenceNumber(scope, records, recordProcessorCheckpointer.getLastCheckpointValue(), recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); - if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) { - LOG.debug("Calling application processRecords() with " + records.size() - + " records from " + shardInfo.getShardId()); - final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() - .withRecords(records) - .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); - - final long recordProcessorStartTimeMillis = System.currentTimeMillis(); - try { - recordProcessor.processRecords(processRecordsInput); - } catch (Exception e) { - LOG.error("ShardId " + shardInfo.getShardId() - + ": Application processRecords() threw an exception when processing shard ", e); - LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " - + records); - } finally { - MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, - recordProcessorStartTimeMillis, MetricsLevel.SUMMARY); - } + if (shouldCallProcessRecords(records)) { + callProcessRecords(getRecordsResult, records); } + } catch (ProvisionedThroughputExceededException pte) { + throttlingReporter.throttled(); + exception = pte; + backoff(); + } catch (RuntimeException e) { LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e); exception = e; - - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie); - } + backoff(); } return new TaskResult(exception); } - // CHECKSTYLE:ON CyclomaticComplexity + + /** + * Sleeps for the configured backoff period. This is usually only called when an exception occurs. + */ + private void backoff() { + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie); + } + } + + /** + * Dispatches a batch of records to the record processor, and handles any fallout from that. + * + * @param getRecordsResult + * the result of the last call to Kinesis + * @param records + * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. + */ + private void callProcessRecords(GetRecordsResult getRecordsResult, List records) { + LOG.debug("Calling application processRecords() with " + records.size() + " records from " + + shardInfo.getShardId()); + final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) + .withCheckpointer(recordProcessorCheckpointer) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + + final long recordProcessorStartTimeMillis = System.currentTimeMillis(); + try { + recordProcessor.processRecords(processRecordsInput); + } catch (Exception e) { + LOG.error("ShardId " + shardInfo.getShardId() + + ": Application processRecords() threw an exception when processing shard ", e); + LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " + records); + } finally { + MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, + recordProcessorStartTimeMillis, MetricsLevel.SUMMARY); + } + } + + /** + * Whether we should call process records or not + * + * @param records + * the records returned from the call to Kinesis, and/or deaggregation + * @return true if the set of records should be dispatched to the record process, false if they should not. + */ + private boolean shouldCallProcessRecords(List records) { + return (!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList(); + } + + /** + * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation + * + * @param records + * the records to deaggregate is deaggregation is required. + * @return returns either the deaggregated records, or the original records + */ + @SuppressWarnings("unchecked") + private List deaggregateRecords(List records) { + // We deaggregate if and only if we got actual Kinesis records, i.e. + // not instances of some subclass thereof. + if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) { + if (this.shard != null) { + return (List) (List) UserRecord.deaggregate(records, + new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), + new BigInteger(this.shard.getHashKeyRange().getEndingHashKey())); + } else { + return (List) (List) UserRecord.deaggregate(records); + } + } + return records; + } + + /** + * Emits metrics, and sleeps if there are no records available + * + * @param startTimeMillis + * the time when the task started + */ + private void handleNoRecords(long startTimeMillis) { + LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); + + long sleepTimeMillis = streamConfig.getIdleTimeInMilliseconds() + - (System.currentTimeMillis() - startTimeMillis); + if (sleepTimeMillis > 0) { + sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds()); + try { + LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " + + shardInfo.getShardId()); + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted"); + } + } + } @Override public TaskType getTaskType() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java new file mode 100644 index 00000000..f88f131f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -0,0 +1,38 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.apachecommons.CommonsLog; +import org.apache.commons.logging.Log; + +@RequiredArgsConstructor +@CommonsLog +class ThrottlingReporter { + + private final int maxConsecutiveWarnThrottles; + private final String shardId; + + private int consecutiveThrottles = 0; + + void throttled() { + consecutiveThrottles++; + String message = "Shard '" + shardId + "' has been throttled " + + consecutiveThrottles + " consecutively"; + + if (consecutiveThrottles > maxConsecutiveWarnThrottles) { + getLog().error(message); + } else { + getLog().warn(message); + } + + } + + void success() { + consecutiveThrottles = 0; + } + + protected Log getLog() { + return log; + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 4d32566e..e95aef50 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -19,8 +19,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,6 +74,8 @@ public class ProcessTaskTest { private @Mock KinesisDataFetcher mockDataFetcher; private @Mock IRecordProcessor mockRecordProcessor; private @Mock RecordProcessorCheckpointer mockCheckpointer; + @Mock + private ThrottlingReporter throttlingReporter; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -90,7 +94,7 @@ public class ProcessTaskTest { final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter); } @Test @@ -101,6 +105,8 @@ public class ProcessTaskTest { .getRecords(maxRecords); TaskResult result = processTask.call(); + verify(throttlingReporter).throttled(); + verify(throttlingReporter, never()).success(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -299,6 +305,8 @@ public class ProcessTaskTest { when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); + verify(throttlingReporter).success(); + verify(throttlingReporter, never()).throttled(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java new file mode 100644 index 00000000..d0645229 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java @@ -0,0 +1,66 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.commons.logging.Log; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ThrottlingReporterTest { + + private static final String SHARD_ID = "Shard-001"; + + @Mock + private Log throttleLog; + + @Test + public void testLessThanMaxThrottles() { + ThrottlingReporter reporter = new LogTestingThrottingReporter(5, SHARD_ID); + reporter.throttled(); + verify(throttleLog).warn(any(Object.class)); + verify(throttleLog, never()).error(any(Object.class)); + + } + + @Test + public void testMoreThanMaxThrottles() { + ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID); + reporter.throttled(); + reporter.throttled(); + verify(throttleLog).warn(any(Object.class)); + verify(throttleLog).error(any(Object.class)); + } + + @Test + public void testSuccessResetsErrors() { + ThrottlingReporter reporter = new LogTestingThrottingReporter(1, SHARD_ID); + reporter.throttled(); + reporter.throttled(); + reporter.throttled(); + reporter.throttled(); + reporter.success(); + reporter.throttled(); + verify(throttleLog, times(2)).warn(any(Object.class)); + verify(throttleLog, times(3)).error(any(Object.class)); + + } + + private class LogTestingThrottingReporter extends ThrottlingReporter { + + public LogTestingThrottingReporter(int maxConsecutiveWarnThrottles, String shardId) { + super(maxConsecutiveWarnThrottles, shardId); + } + + @Override + protected Log getLog() { + return throttleLog; + } + } + +} \ No newline at end of file