diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 6ee34880..918bf9f4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -34,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; @@ -49,6 +50,7 @@ class ProcessTask implements ITask { private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; + private static final int MAX_CONSECUTIVE_THROTTLES = 5; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -58,23 +60,49 @@ class ProcessTask implements ITask { private final StreamConfig streamConfig; private final long backoffTimeMillis; private final Shard shard; + private final ThrottlingReporter throttlingReporter; /** - * @param shardInfo contains information about the shard - * @param streamConfig Stream configuration - * @param recordProcessor Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer Passed to the RecordProcessor so it can checkpoint - * progress - * @param dataFetcher Kinesis data fetcher (used to fetch records from Kinesis) - * @param backoffTimeMillis backoff time when catching exceptions + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress + * @param dataFetcher + * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions */ - public ProcessTask(ShardInfo shardInfo, - StreamConfig streamConfig, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { + this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES)); + } + + /** + * @param shardInfo + * contains information about the shard + * @param streamConfig + * Stream configuration + * @param recordProcessor + * Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer + * Passed to the RecordProcessor so it can checkpoint progress + * @param dataFetcher + * Kinesis data fetcher (used to fetch records from Kinesis) + * @param backoffTimeMillis + * backoff time when catching exceptions + * @param throttlingReporter + * determines how throttling events should be reported in the log. + */ + public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,6 +110,7 @@ class ProcessTask implements ITask { this.dataFetcher = dataFetcher; this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; + this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will @@ -104,9 +133,6 @@ class ProcessTask implements ITask { * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call() */ - - // CHECKSTYLE:OFF CyclomaticComplexity - @SuppressWarnings("unchecked") @Override public TaskResult call() { long startTimeMillis = System.currentTimeMillis(); @@ -120,85 +146,146 @@ class ProcessTask implements ITask { try { if (dataFetcher.isShardEndReached()) { LOG.info("Reached end of shard " + shardInfo.getShardId()); - boolean shardEndReached = true; - return new TaskResult(null, shardEndReached); + return new TaskResult(null, true); } final GetRecordsResult getRecordsResult = getRecordsResult(); + throttlingReporter.success(); List records = getRecordsResult.getRecords(); if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { - LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); + handleNoRecords(startTimeMillis); + } + records = deaggregateRecords(records); - long sleepTimeMillis = - streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis); - if (sleepTimeMillis > 0) { - sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds()); - try { - LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " - + shardInfo.getShardId()); - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted"); - } - } - } - - // We deaggregate if and only if we got actual Kinesis records, i.e. - // not instances of some subclass thereof. - if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) { - if (this.shard != null) { - records = (List) (List) UserRecord.deaggregate(records, - new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), - new BigInteger(this.shard.getHashKeyRange().getEndingHashKey())); - } else { - records = (List) (List) UserRecord.deaggregate(records); - } - } - recordProcessorCheckpointer.setLargestPermittedCheckpointValue( filterAndGetMaxExtendedSequenceNumber(scope, records, recordProcessorCheckpointer.getLastCheckpointValue(), recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); - if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) { - LOG.debug("Calling application processRecords() with " + records.size() - + " records from " + shardInfo.getShardId()); - final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() - .withRecords(records) - .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); - - final long recordProcessorStartTimeMillis = System.currentTimeMillis(); - try { - recordProcessor.processRecords(processRecordsInput); - } catch (Exception e) { - LOG.error("ShardId " + shardInfo.getShardId() - + ": Application processRecords() threw an exception when processing shard ", e); - LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " - + records); - } finally { - MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, - recordProcessorStartTimeMillis, MetricsLevel.SUMMARY); - } + if (shouldCallProcessRecords(records)) { + callProcessRecords(getRecordsResult, records); } + } catch (ProvisionedThroughputExceededException pte) { + String message = "Shard '" + shardInfo.getShardId() + "' has been throttled " + + throttlingReporter.getConsecutiveThrottles() + " consecutively"; + + throttlingReporter.throttled(); + if (throttlingReporter.shouldReportError()) { + LOG.error(message); + } else { + LOG.warn(message); + } + exception = pte; + backoff(); + } catch (RuntimeException e) { LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e); exception = e; - - // backoff if we encounter an exception. - try { - Thread.sleep(this.backoffTimeMillis); - } catch (InterruptedException ie) { - LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie); - } + backoff(); } return new TaskResult(exception); } - // CHECKSTYLE:ON CyclomaticComplexity + + /** + * Sleeps for the configured backoff period. This is usually only called when an exception occurs. + */ + private void backoff() { + // backoff if we encounter an exception. + try { + Thread.sleep(this.backoffTimeMillis); + } catch (InterruptedException ie) { + LOG.debug(shardInfo.getShardId() + ": Sleep was interrupted", ie); + } + } + + /** + * Dispatches a batch of records to the record processor, and handles any fallout from that. + * + * @param getRecordsResult + * the result of the last call to Kinesis + * @param records + * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. + */ + private void callProcessRecords(GetRecordsResult getRecordsResult, List records) { + LOG.debug("Calling application processRecords() with " + records.size() + " records from " + + shardInfo.getShardId()); + final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) + .withCheckpointer(recordProcessorCheckpointer) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + + final long recordProcessorStartTimeMillis = System.currentTimeMillis(); + try { + recordProcessor.processRecords(processRecordsInput); + } catch (Exception e) { + LOG.error("ShardId " + shardInfo.getShardId() + + ": Application processRecords() threw an exception when processing shard ", e); + LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: " + records); + } finally { + MetricsHelper.addLatencyPerShard(shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, + recordProcessorStartTimeMillis, MetricsLevel.SUMMARY); + } + } + + /** + * Whether we should call process records or not + * + * @param records + * the records returned from the call to Kinesis, and/or deaggregation + * @return true if the set of records should be dispatched to the record process, false if they should not. + */ + private boolean shouldCallProcessRecords(List records) { + return (!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList(); + } + + /** + * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation + * + * @param records + * the records to deaggregate is deaggregation is required. + * @return returns either the deaggregated records, or the original records + */ + @SuppressWarnings("unchecked") + private List deaggregateRecords(List records) { + // We deaggregate if and only if we got actual Kinesis records, i.e. + // not instances of some subclass thereof. + if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) { + if (this.shard != null) { + return (List) (List) UserRecord.deaggregate(records, + new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), + new BigInteger(this.shard.getHashKeyRange().getEndingHashKey())); + } else { + return (List) (List) UserRecord.deaggregate(records); + } + } + return records; + } + + /** + * Emits metrics, and sleeps if there are no records available + * + * @param startTimeMillis + * the time when the task started + */ + private void handleNoRecords(long startTimeMillis) { + LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId()); + + long sleepTimeMillis = streamConfig.getIdleTimeInMilliseconds() + - (System.currentTimeMillis() - startTimeMillis); + if (sleepTimeMillis > 0) { + sleepTimeMillis = Math.max(sleepTimeMillis, streamConfig.getIdleTimeInMilliseconds()); + try { + LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " + + shardInfo.getShardId()); + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + LOG.debug("ShardId " + shardInfo.getShardId() + ": Sleep was interrupted"); + } + } + } @Override public TaskType getTaskType() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java new file mode 100644 index 00000000..05ad6622 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +class ThrottlingReporter { + + private final int maxConsecutiveWarnThrottles; + + @Getter + private int consecutiveThrottles = 0; + + void throttled() { + consecutiveThrottles++; + } + + void success() { + consecutiveThrottles = 0; + } + + boolean shouldReportError() { + return consecutiveThrottles > maxConsecutiveWarnThrottles; + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java new file mode 100644 index 00000000..3cdf5197 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java @@ -0,0 +1,39 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class ThrottlingReporterTest { + + @Test + public void testLessThanMaxThrottles() { + ThrottlingReporter reporter = new ThrottlingReporter(5); + assertThat(reporter.shouldReportError(), is(false)); + reporter.throttled(); + assertThat(reporter.shouldReportError(), is(false)); + } + + @Test + public void testMoreThanMaxThrottles() { + ThrottlingReporter reporter = new ThrottlingReporter(1); + assertThat(reporter.shouldReportError(), is(false)); + reporter.throttled(); + reporter.throttled(); + assertThat(reporter.shouldReportError(), is(true)); + } + + @Test + public void testSuccessResetsErrors() { + ThrottlingReporter reporter = new ThrottlingReporter(1); + assertThat(reporter.shouldReportError(), is(false)); + reporter.throttled(); + reporter.throttled(); + assertThat(reporter.shouldReportError(), is(true)); + reporter.success(); + assertThat(reporter.shouldReportError(), is(false)); + + } + +} \ No newline at end of file