From c28eacea562d56aa910decf024eacd741e177a36 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Thu, 8 Jun 2017 07:19:55 -0700 Subject: [PATCH] Initial start of fix for requested shutdown --- .../worker/RequestedShutdownCoordinator.java | 129 ++++++++++++++++++ .../lib/worker/ShutdownFuture.java | 70 +++++----- .../clientlibrary/lib/worker/Worker.java | 5 +- .../lib/worker/ShutdownFutureTest.java | 28 +++- 4 files changed, 193 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java new file mode 100644 index 00000000..76139149 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RequestedShutdownCoordinator.java @@ -0,0 +1,129 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class RequestedShutdownCoordinator { + + private final ExecutorService executorService; + + RequestedShutdownCoordinator(ExecutorService executorService) { + this.executorService = executorService; + } + + static class RequestedShutdownCallable implements Callable { + + private static final Log log = LogFactory.getLog(RequestedShutdownCallable.class); + + private final CountDownLatch shutdownCompleteLatch; + private final CountDownLatch notificationCompleteLatch; + private final Worker worker; + private final ExecutorService shutdownExecutor; + + RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker, ExecutorService shutdownExecutor) { + this.shutdownCompleteLatch = shutdownCompleteLatch; + this.notificationCompleteLatch = notificationCompleteLatch; + this.worker = worker; + this.shutdownExecutor = shutdownExecutor; + } + + private boolean isWorkerShutdownComplete() { + return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty(); + } + + private long outstandingRecordProcessors(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + final long startNanos = System.nanoTime(); + + // + // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested. + // There is the possibility of a race condition where a lease is terminated after the shutdown request + // notification is started, but before the ShardConsumer is sent the notification. In this case the + // ShardConsumer would start the lease loss shutdown, and may never call the notification methods. + // + if (!notificationCompleteLatch.await(timeout, unit)) { + long awaitingNotification = notificationCompleteLatch.getCount(); + long awaitingFinalShutdown = shutdownCompleteLatch.getCount(); + log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and " + + awaitingFinalShutdown + " awaiting final shutdown"); + if (awaitingFinalShutdown != 0) { + // + // The number of record processor awaiting final shutdown should be a superset of the those awaiting + // notification + // + return checkWorkerShutdownMiss(awaitingFinalShutdown); + } + } + + long remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time."); + + // + // Once all record processors have been notified of the shutdown it is safe to allow the worker to + // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases. + // + worker.shutdown(); + remaining = remainingTimeout(timeout, unit, startNanos); + throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time."); + + // + // Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown + // processing. This should really be a no-op since as part of the notification completion the lease for + // ShardConsumer is terminated. + // + if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) { + long outstanding = shutdownCompleteLatch.getCount(); + log.info("Awaiting " + outstanding + " record processors to complete final shutdown"); + + return checkWorkerShutdownMiss(outstanding); + } + return 0; + } + + private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) { + long checkNanos = System.nanoTime() - startNanos; + return unit.toNanos(timeout) - checkNanos; + } + + private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException { + if (remainingNanos <= 0) { + throw new TimeoutException(message); + } + } + + /** + * This checks to see if the worker has already hit it's shutdown target, while there is outstanding record + * processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the + * latch should be decremented before the shutdown completion. + * + * @param outstanding + * the number of record processor still awaiting shutdown. + * @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already. + */ + private long checkWorkerShutdownMiss(long outstanding) { + if (isWorkerShutdownComplete()) { + if (outstanding != 0) { + log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding + + " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: " + + worker.isShutdownComplete() + " -- Consumer Map: " + + worker.getShardInfoShardConsumerMap().size()); + } + return 0; + } + return outstanding; + } + + @Override + public Void call() throws Exception { + return null; + } + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java index 8ee96537..8e530df5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFuture.java @@ -1,44 +1,48 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete. */ -class ShutdownFuture implements Future { +class ShutdownFuture { private static final Log log = LogFactory.getLog(ShutdownFuture.class); private final CountDownLatch shutdownCompleteLatch; private final CountDownLatch notificationCompleteLatch; private final Worker worker; + private final ExecutorService shutdownExecutor; ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) { + this(shutdownCompleteLatch, notificationCompleteLatch, worker, makeExecutor()); + } + + ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker, + ExecutorService shutdownExecutor) { this.shutdownCompleteLatch = shutdownCompleteLatch; this.notificationCompleteLatch = notificationCompleteLatch; this.worker = worker; + this.shutdownExecutor = shutdownExecutor; } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException("Cannot cancel a shutdown process"); - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return isWorkerShutdownComplete(); + private static ExecutorService makeExecutor() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RequestShutdown-%04d") + .build(); + return Executors.newSingleThreadExecutor(threadFactory); } private boolean isWorkerShutdownComplete() { @@ -128,28 +132,26 @@ class ShutdownFuture implements Future { return outstanding; } - @Override - public Void get() throws InterruptedException, ExecutionException { - boolean complete = false; - do { - try { - long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); - complete = outstanding == 0; - log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); - } catch (TimeoutException te) { - log.info("Timeout while waiting for completion: " + te.getMessage()); - } - - } while(!complete); - return null; + Future startShutdown() { + return shutdownExecutor.submit(new ShutdownCallable()); } - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - long outstanding = outstandingRecordProcessors(timeout, unit); - if (outstanding != 0) { - throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown."); + private class ShutdownCallable implements Callable { + @Override + public Void call() throws Exception { + boolean complete = false; + do { + try { + long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS); + complete = outstanding == 0; + log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown."); + } catch (TimeoutException te) { + log.info("Timeout while waiting for completion: " + te.getMessage()); + } + + } while (!complete); + return null; } - return null; } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index bf9f4e7d..03901c60 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -18,10 +18,12 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -572,8 +574,7 @@ public class Worker implements Runnable { } } - return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this); - + return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this).startShutdown(); } boolean isShutdownComplete() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java index cccbc9a1..3e93a8cd 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownFutureTest.java @@ -9,11 +9,18 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -33,24 +40,29 @@ public class ShutdownFutureTest { private Worker worker; @Mock private ConcurrentMap shardInfoConsumerMap; + @Mock + private ExecutorService executorService; @Test public void testSimpleGetAlreadyCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + mockNotificationComplete(true); mockShutdownComplete(true); + Future future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker, executorService).startShutdown(); + future.get(); verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class)); verify(worker).shutdown(); verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class)); + verify(executorService.shutdownNow()); } @Test public void testNotificationNotCompleted() throws Exception { - ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker); + ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker, executorService); mockNotificationComplete(false, true); mockShutdownComplete(true); @@ -212,7 +224,7 @@ public class ShutdownFutureTest { assertThat("Expected a timeout exception to occur", gotTimeout); } - private void awaitFuture(ShutdownFuture future) throws Exception { + private void awaitFuture(Future future) throws Exception { future.get(1, TimeUnit.SECONDS); } @@ -233,4 +245,14 @@ public class ShutdownFutureTest { when(latch.getCount()).thenReturn(remaining, additionalRemaining); } + private void mockExecutor() { + when(executorService.submit(any(Callable.class))).thenAnswer(new Answer>() { + @Override + public Future answer(InvocationOnMock invocation) throws Throwable { + Callable callable = (Callable)invocation.getArgumentAt(0, Callable.class); + return Futures.immediateFuture(callable.call()); + } + }) + } + } \ No newline at end of file