Fix for failing testExceptionInProcessingStopsRequests (#356)

Changed to await for the next request before checking the healthCheck outcome.
Also add the test name to the thread name.
This commit is contained in:
Justin Pfifer 2018-08-06 08:31:02 -07:00 committed by GitHub
parent dc6db0d007
commit 1a7f68827c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
@ -50,13 +51,17 @@ import java.util.function.Function;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
@ -111,10 +116,16 @@ public class ShardConsumerTest {
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty(); private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
@Rule
public TestName testName = new TestName();
@Before @Before
public void before() { public void before() {
shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-" + testName.getMethodName() + "-%04d")
.setDaemon(true).build();
executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
processRecordsInput = ProcessRecordsInput.builder().isAtShardEnd(false).cacheEntryTime(Instant.now()) processRecordsInput = ProcessRecordsInput.builder().isAtShardEnd(false).cacheEntryTime(Instant.now())
.millisBehindLatest(1000L).records(Collections.emptyList()).build(); .millisBehindLatest(1000L).records(Collections.emptyList()).build();
} }
@ -455,11 +466,11 @@ public class ShardConsumerTest {
cache.publish(); cache.publish();
awaitAndResetBarrier(taskCallBarrier); awaitAndResetBarrier(taskCallBarrier);
cache.awaitRequest();
Throwable healthCheckOutcome = consumer.healthCheck(); Throwable healthCheckOutcome = consumer.healthCheck();
assertThat(healthCheckOutcome, equalTo(expectedException)); assertThat(healthCheckOutcome, equalTo(expectedException));
cache.awaitRequest();
verify(cache.subscription, times(2)).request(anyLong()); verify(cache.subscription, times(2)).request(anyLong());
} }