Move Throttle Logging to the ThrottlingReporter

Moved the logging of throttling messages to the throttling
reporter. Added a custom logging mock to allow testing of the the
throttling reporter.
This commit is contained in:
Pfifer, Justin 2017-02-16 06:57:36 -08:00
parent 33ec7d601a
commit cf106f6554
4 changed files with 86 additions and 29 deletions

View file

@ -80,7 +80,8 @@ class ProcessTask implements ITask {
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES)); skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()));
} }
/** /**
@ -169,15 +170,7 @@ class ProcessTask implements ITask {
callProcessRecords(getRecordsResult, records); callProcessRecords(getRecordsResult, records);
} }
} catch (ProvisionedThroughputExceededException pte) { } catch (ProvisionedThroughputExceededException pte) {
String message = "Shard '" + shardInfo.getShardId() + "' has been throttled "
+ throttlingReporter.getConsecutiveThrottles() + " consecutively";
throttlingReporter.throttled(); throttlingReporter.throttled();
if (throttlingReporter.shouldReportError()) {
LOG.error(message);
} else {
LOG.warn(message);
}
exception = pte; exception = pte;
backoff(); backoff();

View file

@ -2,25 +2,32 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
@RequiredArgsConstructor @RequiredArgsConstructor
@CommonsLog
class ThrottlingReporter { class ThrottlingReporter {
private final int maxConsecutiveWarnThrottles; private final int maxConsecutiveWarnThrottles;
private final String shardId;
@Getter
private int consecutiveThrottles = 0; private int consecutiveThrottles = 0;
void throttled() { void throttled() {
consecutiveThrottles++; consecutiveThrottles++;
String message = "Shard '" + shardId + "' has been throttled "
+ consecutiveThrottles + " consecutively";
if (consecutiveThrottles > maxConsecutiveWarnThrottles) {
log.error(message);
} else {
log.warn(message);
}
} }
void success() { void success() {
consecutiveThrottles = 0; consecutiveThrottles = 0;
} }
boolean shouldReportError() {
return consecutiveThrottles > maxConsecutiveWarnThrottles;
}
} }

View file

@ -19,6 +19,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -106,7 +107,6 @@ public class ProcessTaskTest {
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(throttlingReporter).throttled(); verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success(); verify(throttlingReporter, never()).success();
verify(throttlingReporter).shouldReportError();
assertTrue("Result should contain ProvisionedThroughputExceededException", assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException); result.getException() instanceof ProvisionedThroughputExceededException);
} }
@ -307,8 +307,6 @@ public class ProcessTaskTest {
processTask.call(); processTask.call();
verify(throttlingReporter).success(); verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled(); verify(throttlingReporter, never()).throttled();
verify(throttlingReporter, never()).shouldReportError();
verify(throttlingReporter, never()).getConsecutiveThrottles();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture()); verify(mockRecordProcessor).processRecords(priCaptor.capture());

View file

@ -1,38 +1,97 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.is; import static org.mockito.Matchers.any;
import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogConfigurationException;
import org.apache.commons.logging.impl.LogFactoryImpl;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import lombok.RequiredArgsConstructor;
@RunWith(MockitoJUnitRunner.class)
public class ThrottlingReporterTest { public class ThrottlingReporterTest {
private static final String SHARD_ID = "Shard-001";
private static final Log throttleLog = mock(Log.class);
@RequiredArgsConstructor
private static class DirectRegisterLogFactory extends LogFactoryImpl {
private final Map<Class<?>, Log> logMapping;
public static void mockLogFactory(Map<Class<?>, Log> logMapping) {
factories.put(Thread.currentThread().getContextClassLoader(), new DirectRegisterLogFactory(logMapping));
}
@Override
public Log getInstance(Class clazz) throws LogConfigurationException {
if (logMapping.containsKey(clazz)) {
return logMapping.get(clazz);
}
return super.getInstance(clazz);
}
}
@BeforeClass
public static void beforeClass() {
Map<Class<?>, Log> logMapping = new HashMap<>();
logMapping.put(ThrottlingReporter.class, throttleLog);
DirectRegisterLogFactory.mockLogFactory(logMapping);
}
@Before
public void before() {
//
// Have to to do this since the only time that the logFactory will be able to inject a mock is on
// class load.
//
Mockito.reset(throttleLog);
}
@Test @Test
public void testLessThanMaxThrottles() { public void testLessThanMaxThrottles() {
ThrottlingReporter reporter = new ThrottlingReporter(5); ThrottlingReporter reporter = new ThrottlingReporter(5, SHARD_ID);
assertThat(reporter.shouldReportError(), is(false));
reporter.throttled(); reporter.throttled();
assertThat(reporter.shouldReportError(), is(false)); verify(throttleLog).warn(any(Object.class));
verify(throttleLog, never()).error(any(Object.class));
} }
@Test @Test
public void testMoreThanMaxThrottles() { public void testMoreThanMaxThrottles() {
ThrottlingReporter reporter = new ThrottlingReporter(1); ThrottlingReporter reporter = new ThrottlingReporter(1, SHARD_ID);
assertThat(reporter.shouldReportError(), is(false));
reporter.throttled(); reporter.throttled();
reporter.throttled(); reporter.throttled();
assertThat(reporter.shouldReportError(), is(true)); verify(throttleLog).warn(any(Object.class));
verify(throttleLog).error(any(Object.class));
} }
@Test @Test
public void testSuccessResetsErrors() { public void testSuccessResetsErrors() {
ThrottlingReporter reporter = new ThrottlingReporter(1); ThrottlingReporter reporter = new ThrottlingReporter(1, SHARD_ID);
assertThat(reporter.shouldReportError(), is(false)); reporter.throttled();
reporter.throttled();
reporter.throttled(); reporter.throttled();
reporter.throttled(); reporter.throttled();
assertThat(reporter.shouldReportError(), is(true));
reporter.success(); reporter.success();
assertThat(reporter.shouldReportError(), is(false)); reporter.throttled();
verify(throttleLog, times(2)).warn(any(Object.class));
verify(throttleLog, times(3)).error(any(Object.class));
} }