diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index 4d32566e..de5b293b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -72,6 +73,8 @@ public class ProcessTaskTest { private @Mock KinesisDataFetcher mockDataFetcher; private @Mock IRecordProcessor mockRecordProcessor; private @Mock RecordProcessorCheckpointer mockCheckpointer; + @Mock + private ThrottlingReporter throttlingReporter; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -90,7 +93,7 @@ public class ProcessTaskTest { final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter); } @Test @@ -101,6 +104,9 @@ public class ProcessTaskTest { .getRecords(maxRecords); TaskResult result = processTask.call(); + verify(throttlingReporter).throttled(); + verify(throttlingReporter, never()).success(); + verify(throttlingReporter).shouldReportError(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -299,6 +305,10 @@ public class ProcessTaskTest { when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); + verify(throttlingReporter).success(); + verify(throttlingReporter, never()).throttled(); + verify(throttlingReporter, never()).shouldReportError(); + verify(throttlingReporter, never()).getConsecutiveThrottles(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture());