From 524717538e85e5a7afe1bad5408decc79015592b Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Fri, 21 Oct 2016 09:33:20 -0700 Subject: [PATCH] Handle Possible Race Conditions, and Comments Handle some possible race conditions during the shutdown process. Added more comments clarifying the how shutdown works, and the race conditions it can face. --- .../lib/worker/ShardConsumer.java | 6 -- .../ShardConsumerShutdownNotification.java | 4 + .../lib/worker/ShutdownFuture.java | 84 +++++++++++++------ .../clientlibrary/lib/worker/Worker.java | 14 +++- .../lib/worker/ShutdownFutureTest.java | 43 +++++++++- 5 files changed, 114 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index f63ec59d..03a8cd33 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -15,8 +15,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Collections; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -39,12 +37,8 @@ import com.google.common.annotations.VisibleForTesting; */ class ShardConsumer { - - private static final Log LOG = LogFactory.getLog(ShardConsumer.class); - private static final Set EMPTY_DISALLOWED_SET = Collections.emptySet(); - private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java index 3ee21c04..b3792131 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerShutdownNotification.java @@ -47,6 +47,10 @@ class ShardConsumerShutdownNotification implements ShutdownNotification { if (notificationComplete) { return; } + // + // Once the notification has been completed, the lease needs to dropped to allow the worker to complete + // shutdown of the record processor. + // leaseCoordinator.dropLease(lease); notificationCompleteLatch.countDown(); notificationComplete = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java index 21c004c1..218bbefb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java @@ -20,8 +20,6 @@ class ShutdownFuture implements Future { private final CountDownLatch notificationCompleteLatch; private final Worker worker; - private boolean workerShutdownCalled = false; - ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { this.shutdownCompleteLatch = shutdownCompleteLatch; this.notificationCompleteLatch = notificationCompleteLatch; @@ -48,57 +46,91 @@ class ShutdownFuture implements Future { } private long outstandingRecordProcessors(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, TimeoutException { + + long startNanos = System.nanoTime(); + // // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // There is the possibility of a race condition where a lease is terminated after the shutdown request + // notification is started, but before the ShardConsumer is sent the notification. In this case the + // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. // if (!notificationCompleteLatch.await(timeout, unit)) { long awaitingNotification = notificationCompleteLatch.getCount(); - log.info("Awaiting " + awaitingNotification + " record processors to complete initial shutdown"); + log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification"); long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); if (awaitingFinalShutdown != 0) { - return awaitingFinalShutdown; + // + // The number of record processor awaiting final shutdown should be a superset of the those awaiting + // notification + // + return checkWorkerShutdownMiss(awaitingFinalShutdown); } } + + long remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time."); + // // Once all record processors have been notified of the shutdown it is safe to allow the worker to // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. // - if (!workerShutdownCalled) { - // - // Unfortunately Worker#shutdown() doesn't appear to be idempotent. - // - worker.shutdown(); - } + worker.shutdown(); + remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time."); + // // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown // processing. This should really be a no-op since as part of the notification completion the lease for // ShardConsumer is terminated. // - if (!shutdownCompleteLatch.await(timeout, unit)) { + if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) { long outstanding = shutdownCompleteLatch.getCount(); log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); - if (isWorkerShutdownComplete()) { - if (outstanding != 0) { - log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding - + " with a current value of " + shutdownCompleteLatch.getCount() - + ". shutdownComplete: " + worker.isShutdownComplete() + " -- Consumer Map: " - + worker.getShardInfoShardConsumerMap().size()); - } - return 0; - } - return outstanding; + + return checkWorkerShutdownMiss(outstanding); } return 0; } + private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) { + long checkNanos = System.nanoTime() - startNanos; + return unit.toNanos(timeout) - checkNanos; + } + + private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException { + if (remainingNanos <= 0) { + throw new TimeoutException(message); + } + } + + private long checkWorkerShutdownMiss(long outstanding) { + if (isWorkerShutdownComplete()) { + if (outstanding != 0) { + log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " + + worker.isShutdownComplete() + " -- Consumer Map: " + + worker.getShardInfoShardConsumerMap().size()); + } + return 0; + } + return outstanding; + } + @Override public Void get() throws InterruptedException, ExecutionException { - long outstanding; + boolean complete = false; do { - outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); - log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); - } while(outstanding != 0); + try { + long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); + complete = outstanding == 0; + log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); + } catch (TimeoutException te) { + log.info("Timeout while waiting for completion: " + te.getMessage()); + } + + } while(!complete); return null; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 76b6116f..ddc05275 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -28,7 +28,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,6 +42,7 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; @@ -490,9 +490,11 @@ public class Worker implements Runnable { } /** - * Requests shutdown of the worker, notifying record processors, that implement - * {@link IShutdownNotificationAware}, of the impending shutdown. - * This gives the record processor a final chance to checkpoint. + * Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware}, + * of the impending shutdown. This gives the record processor a final chance to checkpoint. + * + * It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is + * lost after requesting shutdown, but before the notification is dispatched. * *

Requested Shutdown Process

When a shutdown process is requested it operates slightly differently to * allow the record processors a chance to checkpoint a final time. @@ -567,6 +569,10 @@ public class Worker implements Runnable { * */ public void shutdown() { + if (shutdown) { + LOG.warn("Shutdown requested a second time."); + return; + } LOG.info("Worker shutdown requested."); // Set shutdown flag, so Worker.run can start shutdown process. diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java index 124850c9..cccbc9a1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java @@ -3,6 +3,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -16,7 +17,10 @@ import java.util.concurrent.TimeoutException; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; @RunWith(MockitoJUnitRunner.class) public class ShutdownFutureTest { @@ -51,6 +55,10 @@ public class ShutdownFutureTest { mockNotificationComplete(false, true); mockShutdownComplete(true); + when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap); + when(shardInfoConsumerMap.isEmpty()).thenReturn(false); + when(worker.isShutdownComplete()).thenReturn(false); + when(notificationCompleteLatch.getCount()).thenReturn(1L); when(shutdownCompleteLatch.getCount()).thenReturn(1L); @@ -135,8 +143,41 @@ public class ShutdownFutureTest { verify(shardInfoConsumerMap).isEmpty(); verify(shardInfoConsumerMap).size(); + } + @Test + public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(false); + mockShutdownComplete(false); + mockOutstanding(notificationCompleteLatch, 1L); + mockOutstanding(shutdownCompleteLatch, 1L); + + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(0); + + awaitFuture(future); + verify(worker, never()).shutdown(); + verifyLatchAwait(notificationCompleteLatch); + verify(shutdownCompleteLatch, never()).await(); + + verify(worker, times(2)).isShutdownComplete(); + verify(worker, times(2)).getShardInfoShardConsumerMap(); + + verify(shardInfoConsumerMap).isEmpty(); + verify(shardInfoConsumerMap).size(); + } + + @Test(expected = TimeoutException.class) + public void testTimeExceededException() throws Exception { + ShutdownFuture future = create(); + mockNotificationComplete(false); + mockOutstanding(notificationCompleteLatch, 1L); + when(worker.isShutdownComplete()).thenReturn(false); + mockShardInfoConsumerMap(1); + + future.get(1, TimeUnit.NANOSECONDS); } private ShutdownFuture create() { @@ -172,7 +213,7 @@ public class ShutdownFutureTest { } private void awaitFuture(ShutdownFuture future) throws Exception { - future.get(1, TimeUnit.MILLISECONDS); + future.get(1, TimeUnit.SECONDS); } private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {