Updating comments with bug fix, test cases were failing when being run in succession becuase of incorrect closing of threads
This commit is contained in:
parent
dc0fa21b4a
commit
9460889b3c
1 changed files with 6 additions and 5 deletions
|
|
@ -55,7 +55,7 @@ public class TestConsumer {
|
||||||
private LifecycleConfig lifecycleConfig;
|
private LifecycleConfig lifecycleConfig;
|
||||||
private ProcessorConfig processorConfig;
|
private ProcessorConfig processorConfig;
|
||||||
private Scheduler scheduler;
|
private Scheduler scheduler;
|
||||||
private static final ScheduledExecutorService PRODUCER_EXECUTOR = Executors.newSingleThreadScheduledExecutor();
|
private ScheduledExecutorService producerExecutor;
|
||||||
private ScheduledFuture<?> producerFuture;
|
private ScheduledFuture<?> producerFuture;
|
||||||
private ScheduledExecutorService consumerExecutor;
|
private ScheduledExecutorService consumerExecutor;
|
||||||
private ScheduledFuture<?> consumerFuture;
|
private ScheduledFuture<?> consumerFuture;
|
||||||
|
|
@ -127,7 +127,8 @@ public class TestConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startProducer() {
|
private void startProducer() {
|
||||||
producerFuture = PRODUCER_EXECUTOR.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
|
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// Reshard logic if required for the test
|
// Reshard logic if required for the test
|
||||||
if (consumerConfig.getReshardFactorList() != null) {
|
if (consumerConfig.getReshardFactorList() != null) {
|
||||||
|
|
@ -137,7 +138,7 @@ public class TestConsumer {
|
||||||
|
|
||||||
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
|
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
|
||||||
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
|
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
|
||||||
PRODUCER_EXECUTOR.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
|
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -179,8 +180,8 @@ public class TestConsumer {
|
||||||
if (producerFuture != null) {
|
if (producerFuture != null) {
|
||||||
producerFuture.cancel(false);
|
producerFuture.cancel(false);
|
||||||
}
|
}
|
||||||
if (PRODUCER_EXECUTOR != null) {
|
if (producerExecutor != null) {
|
||||||
PRODUCER_EXECUTOR.shutdown();
|
producerExecutor.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue