diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index e5e21735..f4cd9307 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -41,12 +43,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -54,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; @@ -511,6 +511,13 @@ public class WorkerTest { verify(recordProcessor, times(1)).shutdown(any(ShutdownInput.class)); } + /** + * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of + * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. + * This behavior makes the test a bit racy, since we need to ensure a specific order of events. + * + * @throws Exception + */ @Test public final void testWorkerForcefulShutdown() throws Exception { final List shardList = createShardListWithOneShard(); @@ -538,6 +545,10 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessor recordProcessor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(recordProcessor); + final Semaphore actionBlocker = new Semaphore(1); + final Semaphore shutdownBlocker = new Semaphore(1); + + actionBlocker.acquire(); doAnswer(new Answer () { @Override @@ -547,17 +558,22 @@ public class WorkerTest { // Block for some time now to test forceful shutdown. Also, check if record processor // was interrupted or not. - final long totalSleepTimeMillis = failoverTimeMillis * 10; final long startTimeMillis = System.currentTimeMillis(); long elapsedTimeMillis = 0; - while (elapsedTimeMillis < totalSleepTimeMillis) { - try { - Thread.sleep(totalSleepTimeMillis); - } catch (InterruptedException e) { - recordProcessorInterrupted.getAndSet(true); - } - elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis; + + LOG.info("Entering sleep @ " + startTimeMillis + " with elapsedMills: " + elapsedTimeMillis); + shutdownBlocker.acquire(); + try { + actionBlocker.acquire(); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted @ " + System.currentTimeMillis() + " elapsedMillis: " + + (System.currentTimeMillis() - startTimeMillis)); + recordProcessorInterrupted.getAndSet(true); } + shutdownBlocker.release(); + elapsedTimeMillis = System.currentTimeMillis() - startTimeMillis; + LOG.info("Sleep completed @ " + System.currentTimeMillis() + " elapsedMillis: " + elapsedTimeMillis); + return null; } }).when(recordProcessor).processRecords(any(ProcessRecordsInput.class)); @@ -587,7 +603,19 @@ public class WorkerTest { Assert.assertTrue(workerThread.getState() == State.TERMINATED); // Shutdown should not be called in this case because record processor is blocked. verify(recordProcessor, times(0)).shutdown(any(ShutdownInput.class)); - Assert.assertTrue(recordProcessorInterrupted.get()); + + // + // Release the worker thread + // + actionBlocker.release(); + // + // Give the worker thread time to execute it's interrupted handler. + // + shutdownBlocker.tryAcquire(100, TimeUnit.MILLISECONDS); + // + // Now we can see if it was actually interrupted. It's possible it wasn't and this will fail. + // + assertThat(recordProcessorInterrupted.get(), equalTo(true)); } /**