Fix Race Condition on Worker Test

The test 'testWorkerForcefulShutdown' was using Thread.sleep() to create
the conditions for an interrupted state.  It was possible for the test
to take enough time for the sleep to actually exit before the interrupt
was sent.  This would cause the test to fail.  Changing to a pair of
sempahores ensures that the test record processor will remain blocked
for the expected amount of time.
This commit is contained in:
Pfifer, Justin 2016-07-21 08:14:20 -07:00
parent 2bf45eafda
commit 586e97405e

View file

@ -14,6 +14,8 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
@ -41,12 +43,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -54,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
@ -511,6 +511,13 @@ public class WorkerTest {
verify(recordProcessor, times(1)).shutdown(any(ShutdownInput.class));
}
/**
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
*
* @throws Exception
*/
@Test
public final void testWorkerForcefulShutdown() throws Exception {
final List<Shard> shardList = createShardListWithOneShard();
@ -538,6 +545,10 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
IRecordProcessor recordProcessor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(recordProcessor);
final Semaphore actionBlocker = new Semaphore(1);
final Semaphore shutdownBlocker = new Semaphore(1);
actionBlocker.acquire();
doAnswer(new Answer<Object> () {
@Override
@ -547,17 +558,22 @@ public class WorkerTest {
// Block for some time now to test forceful shutdown. Also, check if record processor
// was interrupted or not.
final long totalSleepTimeMillis = failoverTimeMillis * 10;
final long startTimeMillis = System.currentTimeMillis();
long elapsedTimeMillis = 0;
while (elapsedTimeMillis < totalSleepTimeMillis) {
try {
Thread.sleep(totalSleepTimeMillis);
} catch (InterruptedException e) {
recordProcessorInterrupted.getAndSet(true);
}
elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("Entering sleep @ " + startTimeMillis + " with elapsedMills: " + elapsedTimeMillis);
shutdownBlocker.acquire();
try {
actionBlocker.acquire();
} catch (InterruptedException e) {
LOG.info("Sleep interrupted @ " + System.currentTimeMillis() + " elapsedMillis: "
+ (System.currentTimeMillis() - startTimeMillis));
recordProcessorInterrupted.getAndSet(true);
}
shutdownBlocker.release();
elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("Sleep completed @ " + System.currentTimeMillis() + " elapsedMillis: " + elapsedTimeMillis);
return null;
}
}).when(recordProcessor).processRecords(any(ProcessRecordsInput.class));
@ -587,7 +603,19 @@ public class WorkerTest {
Assert.assertTrue(workerThread.getState() == State.TERMINATED);
// Shutdown should not be called in this case because record processor is blocked.
verify(recordProcessor, times(0)).shutdown(any(ShutdownInput.class));
Assert.assertTrue(recordProcessorInterrupted.get());
//
// Release the worker thread
//
actionBlocker.release();
//
// Give the worker thread time to execute it's interrupted handler.
//
shutdownBlocker.tryAcquire(100, TimeUnit.MILLISECONDS);
//
// Now we can see if it was actually interrupted. It's possible it wasn't and this will fail.
//
assertThat(recordProcessorInterrupted.get(), equalTo(true));
}
/**