From 586e97405e095a33ffa387be721d46a1ce436d8a Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 21 Jul 2016 08:14:20 -0700 Subject: [PATCH] Fix Race Condition on Worker Test The test 'testWorkerForcefulShutdown' was using Thread.sleep() to create the conditions for an interrupted state. It was possible for the test to take enough time for the sleep to actually exit before the interrupt was sent. This would cause the test to fail. Changing to a pair of sempahores ensures that the test record processor will remain blocked for the expected amount of time. --- .../clientlibrary/lib/worker/WorkerTest.java | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index e5e21735..f4cd9307 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -41,12 +43,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -54,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; @@ -511,6 +511,13 @@ public class WorkerTest { verify(recordProcessor, times(1)).shutdown(any(ShutdownInput.class)); } + /** + * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of + * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. + * This behavior makes the test a bit racy, since we need to ensure a specific order of events. + * + * @throws Exception + */ @Test public final void testWorkerForcefulShutdown() throws Exception { final List shardList = createShardListWithOneShard(); @@ -538,6 +545,10 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessor recordProcessor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(recordProcessor); + final Semaphore actionBlocker = new Semaphore(1); + final Semaphore shutdownBlocker = new Semaphore(1); + + actionBlocker.acquire(); doAnswer(new Answer () { @Override @@ -547,17 +558,22 @@ public class WorkerTest { // Block for some time now to test forceful shutdown. Also, check if record processor // was interrupted or not. - final long totalSleepTimeMillis = failoverTimeMillis * 10; final long startTimeMillis = System.currentTimeMillis(); long elapsedTimeMillis = 0; - while (elapsedTimeMillis < totalSleepTimeMillis) { - try { - Thread.sleep(totalSleepTimeMillis); - } catch (InterruptedException e) { - recordProcessorInterrupted.getAndSet(true); - } - elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis; + + LOG.info("Entering sleep @ " + startTimeMillis + " with elapsedMills: " + elapsedTimeMillis); + shutdownBlocker.acquire(); + try { + actionBlocker.acquire(); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted @ " + System.currentTimeMillis() + " elapsedMillis: " + + (System.currentTimeMillis() - startTimeMillis)); + recordProcessorInterrupted.getAndSet(true); } + shutdownBlocker.release(); + elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis; + LOG.info("Sleep completed @ " + System.currentTimeMillis() + " elapsedMillis: " + elapsedTimeMillis); + return null; } }).when(recordProcessor).processRecords(any(ProcessRecordsInput.class)); @@ -587,7 +603,19 @@ public class WorkerTest { Assert.assertTrue(workerThread.getState() == State.TERMINATED); // Shutdown should not be called in this case because record processor is blocked. verify(recordProcessor, times(0)).shutdown(any(ShutdownInput.class)); - Assert.assertTrue(recordProcessorInterrupted.get()); + + // + // Release the worker thread + // + actionBlocker.release(); + // + // Give the worker thread time to execute it's interrupted handler. + // + shutdownBlocker.tryAcquire(100, TimeUnit.MILLISECONDS); + // + // Now we can see if it was actually interrupted. It's possible it wasn't and this will fail. + // + assertThat(recordProcessorInterrupted.get(), equalTo(true)); } /**