Adding unit test case for validating internal state on initial prefetcher failures

This commit is contained in:
Ashwin Giridharan 2020-06-23 14:49:15 -07:00
parent ef39ecd0df
commit 60af78f7cb
2 changed files with 48 additions and 0 deletions

View file

@ -31,10 +31,12 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;
import static software.amazon.kinesis.utils.ProcessRecordsInputMatcher.eqProcessRecordsInput;
@ -47,6 +49,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -57,6 +60,7 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -153,6 +157,37 @@ public class PrefetchRecordsPublisherTest {
verify(dataFetcher, times(1)).initialize(any(ExtendedSequenceNumber.class), any());
}
@Test
public void testPrefetchPublisherInternalStateNotModifiedWhenPrefetcherThreadStartFails() {
doThrow(new RejectedExecutionException()).doThrow(new RejectedExecutionException()).doCallRealMethod()
.when(executorService).execute(any());
// Initialize try 1
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(0);
// Initialize try 2
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(0);
// Initialize try 3
tryPrefetchCacheStart();
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
verifyInternalState(MAX_SIZE);
verify(dataFetcher, times(3)).initialize(any(ExtendedSequenceNumber.class), any());
}
private void tryPrefetchCacheStart() {
try {
getRecordsCache.start(sequenceNumber, initialPosition);
} catch (Exception e) {
// suppress exception
}
}
private void verifyInternalState(int queueSize) {
Assert.assertTrue(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == queueSize);
}
@Test
public void testGetRecords() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();

View file

@ -35,4 +35,17 @@ public class BlockingUtils {
throw new RuntimeException("No records found");
}
}
public static boolean blockUntilConditionSatisfied(Supplier<Boolean> conditionSupplier, long timeoutMillis) {
while(!conditionSupplier.get() && timeoutMillis > 0 ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
timeoutMillis -= 100;
}
return conditionSupplier.get();
}
}