diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index 35200f6d..8d3179be 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -55,7 +55,7 @@ public class TestConsumer { private LifecycleConfig lifecycleConfig; private ProcessorConfig processorConfig; private Scheduler scheduler; - private static final ScheduledExecutorService PRODUCER_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService producerExecutor; private ScheduledFuture producerFuture; private ScheduledExecutorService consumerExecutor; private ScheduledFuture consumerFuture; @@ -127,7 +127,8 @@ public class TestConsumer { } private void startProducer() { - producerFuture = PRODUCER_EXECUTOR.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); // Reshard logic if required for the test if (consumerConfig.getReshardFactorList() != null) { @@ -137,7 +138,7 @@ public class TestConsumer { // Schedule the stream scales 4 minutes apart with 2 minute starting delay for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { - PRODUCER_EXECUTOR.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); + producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); } } } @@ -179,8 +180,8 @@ public class TestConsumer { if (producerFuture != null) { producerFuture.cancel(false); } - if (PRODUCER_EXECUTOR != null) { - PRODUCER_EXECUTOR.shutdown(); + if (producerExecutor != null) { + producerExecutor.shutdown(); } }