From cf106f65545477c4a4bf1815ee9b1c1a51e63ffa Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 16 Feb 2017 06:57:36 -0800 Subject: [PATCH] Move Throttle Logging to the ThrottlingReporter Moved the logging of throttling messages to the throttling reporter. Added a custom logging mock to allow testing of the the throttling reporter. --- .../clientlibrary/lib/worker/ProcessTask.java | 11 +-- .../lib/worker/ThrottlingReporter.java | 17 ++-- .../lib/worker/ProcessTaskTest.java | 4 +- .../lib/worker/ThrottlingReporterTest.java | 83 ++++++++++++++++--- 4 files changed, 86 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 918bf9f4..c419c693 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -80,7 +80,8 @@ class ProcessTask implements ITask { RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES)); + skipShardSyncAtWorkerInitializationIfLeasesExist, + new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId())); } /** @@ -169,15 +170,7 @@ class ProcessTask implements ITask { callProcessRecords(getRecordsResult, records); } } catch (ProvisionedThroughputExceededException pte) { - String message = "Shard '" + shardInfo.getShardId() + "' has been throttled " - + throttlingReporter.getConsecutiveThrottles() + " consecutively"; - throttlingReporter.throttled(); - if (throttlingReporter.shouldReportError()) { - LOG.error(message); - } else { - LOG.warn(message); - } exception = pte; backoff(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java index 05ad6622..cd78487f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporter.java @@ -2,25 +2,32 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.apachecommons.CommonsLog; @RequiredArgsConstructor +@CommonsLog class ThrottlingReporter { private final int maxConsecutiveWarnThrottles; + private final String shardId; - @Getter private int consecutiveThrottles = 0; void throttled() { consecutiveThrottles++; + String message = "Shard '" + shardId + "' has been throttled " + + consecutiveThrottles + " consecutively"; + + if (consecutiveThrottles > maxConsecutiveWarnThrottles) { + log.error(message); + } else { + log.warn(message); + } + } void success() { consecutiveThrottles = 0; } - boolean shouldReportError() { - return consecutiveThrottles > maxConsecutiveWarnThrottles; - } - } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index de5b293b..e95aef50 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -106,7 +107,6 @@ public class ProcessTaskTest { TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(throttlingReporter).shouldReportError(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -307,8 +307,6 @@ public class ProcessTaskTest { processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(throttlingReporter, never()).shouldReportError(); - verify(throttlingReporter, never()).getConsecutiveThrottles(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java index 3cdf5197..90560a9e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ThrottlingReporterTest.java @@ -1,38 +1,97 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogConfigurationException; +import org.apache.commons.logging.impl.LogFactoryImpl; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import lombok.RequiredArgsConstructor; + +@RunWith(MockitoJUnitRunner.class) public class ThrottlingReporterTest { + private static final String SHARD_ID = "Shard-001"; + + private static final Log throttleLog = mock(Log.class); + + @RequiredArgsConstructor + private static class DirectRegisterLogFactory extends LogFactoryImpl { + + private final Map, Log> logMapping; + + public static void mockLogFactory(Map, Log> logMapping) { + factories.put(Thread.currentThread().getContextClassLoader(), new DirectRegisterLogFactory(logMapping)); + } + + @Override + public Log getInstance(Class clazz) throws LogConfigurationException { + if (logMapping.containsKey(clazz)) { + return logMapping.get(clazz); + } + return super.getInstance(clazz); + } + } + + @BeforeClass + public static void beforeClass() { + Map, Log> logMapping = new HashMap<>(); + logMapping.put(ThrottlingReporter.class, throttleLog); + + DirectRegisterLogFactory.mockLogFactory(logMapping); + } + + @Before + public void before() { + // + // Have to to do this since the only time that the logFactory will be able to inject a mock is on + // class load. + // + Mockito.reset(throttleLog); + } + @Test public void testLessThanMaxThrottles() { - ThrottlingReporter reporter = new ThrottlingReporter(5); - assertThat(reporter.shouldReportError(), is(false)); + ThrottlingReporter reporter = new ThrottlingReporter(5, SHARD_ID); reporter.throttled(); - assertThat(reporter.shouldReportError(), is(false)); + verify(throttleLog).warn(any(Object.class)); + verify(throttleLog, never()).error(any(Object.class)); + } @Test public void testMoreThanMaxThrottles() { - ThrottlingReporter reporter = new ThrottlingReporter(1); - assertThat(reporter.shouldReportError(), is(false)); + ThrottlingReporter reporter = new ThrottlingReporter(1, SHARD_ID); reporter.throttled(); reporter.throttled(); - assertThat(reporter.shouldReportError(), is(true)); + verify(throttleLog).warn(any(Object.class)); + verify(throttleLog).error(any(Object.class)); } @Test public void testSuccessResetsErrors() { - ThrottlingReporter reporter = new ThrottlingReporter(1); - assertThat(reporter.shouldReportError(), is(false)); + ThrottlingReporter reporter = new ThrottlingReporter(1, SHARD_ID); + reporter.throttled(); + reporter.throttled(); reporter.throttled(); reporter.throttled(); - assertThat(reporter.shouldReportError(), is(true)); reporter.success(); - assertThat(reporter.shouldReportError(), is(false)); + reporter.throttled(); + verify(throttleLog, times(2)).warn(any(Object.class)); + verify(throttleLog, times(3)).error(any(Object.class)); }