Update existing tests for throttling
Updated the existing tests to check the ThrottleReporter
This commit is contained in:
parent
cca61706db
commit
33ec7d601a
1 changed files with 11 additions and 1 deletions
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
|
@ -72,6 +73,8 @@ public class ProcessTaskTest {
|
||||||
private @Mock KinesisDataFetcher mockDataFetcher;
|
private @Mock KinesisDataFetcher mockDataFetcher;
|
||||||
private @Mock IRecordProcessor mockRecordProcessor;
|
private @Mock IRecordProcessor mockRecordProcessor;
|
||||||
private @Mock RecordProcessorCheckpointer mockCheckpointer;
|
private @Mock RecordProcessorCheckpointer mockCheckpointer;
|
||||||
|
@Mock
|
||||||
|
private ThrottlingReporter throttlingReporter;
|
||||||
|
|
||||||
private List<Record> processedRecords;
|
private List<Record> processedRecords;
|
||||||
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
|
private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
|
||||||
|
|
@ -90,7 +93,7 @@ public class ProcessTaskTest {
|
||||||
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
|
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
|
||||||
processTask = new ProcessTask(
|
processTask = new ProcessTask(
|
||||||
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
|
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
|
||||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
|
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -101,6 +104,9 @@ public class ProcessTaskTest {
|
||||||
.getRecords(maxRecords);
|
.getRecords(maxRecords);
|
||||||
|
|
||||||
TaskResult result = processTask.call();
|
TaskResult result = processTask.call();
|
||||||
|
verify(throttlingReporter).throttled();
|
||||||
|
verify(throttlingReporter, never()).success();
|
||||||
|
verify(throttlingReporter).shouldReportError();
|
||||||
assertTrue("Result should contain ProvisionedThroughputExceededException",
|
assertTrue("Result should contain ProvisionedThroughputExceededException",
|
||||||
result.getException() instanceof ProvisionedThroughputExceededException);
|
result.getException() instanceof ProvisionedThroughputExceededException);
|
||||||
}
|
}
|
||||||
|
|
@ -299,6 +305,10 @@ public class ProcessTaskTest {
|
||||||
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
|
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
|
||||||
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
|
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
|
||||||
processTask.call();
|
processTask.call();
|
||||||
|
verify(throttlingReporter).success();
|
||||||
|
verify(throttlingReporter, never()).throttled();
|
||||||
|
verify(throttlingReporter, never()).shouldReportError();
|
||||||
|
verify(throttlingReporter, never()).getConsecutiveThrottles();
|
||||||
|
|
||||||
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
|
||||||
verify(mockRecordProcessor).processRecords(priCaptor.capture());
|
verify(mockRecordProcessor).processRecords(priCaptor.capture());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue