diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index d0bfe723..42f7a522 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -50,13 +51,17 @@ import java.util.function.Function; import org.junit.After; import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.ShardInfo; @@ -111,10 +116,16 @@ public class ShardConsumerTest { private Optional logWarningForTaskAfterMillis = Optional.empty(); + @Rule + public TestName testName = new TestName(); + @Before public void before() { shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-" + testName.getMethodName() + "-%04d") + .setDaemon(true).build(); + executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory); + processRecordsInput = ProcessRecordsInput.builder().isAtShardEnd(false).cacheEntryTime(Instant.now()) .millisBehindLatest(1000L).records(Collections.emptyList()).build(); } @@ -455,11 +466,11 @@ public class ShardConsumerTest { cache.publish(); awaitAndResetBarrier(taskCallBarrier); + cache.awaitRequest(); Throwable healthCheckOutcome = consumer.healthCheck(); assertThat(healthCheckOutcome, equalTo(expectedException)); - cache.awaitRequest(); verify(cache.subscription, times(2)).request(anyLong()); }