Initial start of fix for requested shutdown

This commit is contained in:
Pfifer, Justin 2017-06-08 07:19:55 -07:00
parent e121691ac2
commit c28eacea56
4 changed files with 193 additions and 39 deletions

View file

@ -0,0 +1,129 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RequestedShutdownCoordinator {
private final ExecutorService executorService;
RequestedShutdownCoordinator(ExecutorService executorService) {
this.executorService = executorService;
}
static class RequestedShutdownCallable implements Callable<Void> {
private static final Log log = LogFactory.getLog(RequestedShutdownCallable.class);
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
private final ExecutorService shutdownExecutor;
RequestedShutdownCallable(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker, ExecutorService shutdownExecutor) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
this.shutdownExecutor = shutdownExecutor;
}
private boolean isWorkerShutdownComplete() {
return worker.isShutdownComplete() || worker.getShardInfoShardConsumerMap().isEmpty();
}
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
final long startNanos = System.nanoTime();
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification, and "
+ awaitingFinalShutdown + " awaiting final shutdown");
if (awaitingFinalShutdown != 0) {
//
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
// notification
//
return checkWorkerShutdownMiss(awaitingFinalShutdown);
}
}
long remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
worker.shutdown();
remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
return checkWorkerShutdownMiss(outstanding);
}
return 0;
}
private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
long checkNanos = System.nanoTime() - startNanos;
return unit.toNanos(timeout) - checkNanos;
}
private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
if (remainingNanos <= 0) {
throw new TimeoutException(message);
}
}
/**
* This checks to see if the worker has already hit it's shutdown target, while there is outstanding record
* processors. This maybe a little racy due to when the value of outstanding is retrieved. In general though the
* latch should be decremented before the shutdown completion.
*
* @param outstanding
* the number of record processor still awaiting shutdown.
* @return the number of record processors awaiting shutdown, or 0 if the worker believes it's shutdown already.
*/
private long checkWorkerShutdownMiss(long outstanding) {
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.info("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
}
@Override
public Void call() throws Exception {
return null;
}
}
}

View file

@ -1,44 +1,48 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Used as a response from the {@link Worker#requestShutdown()} to allow callers to wait until shutdown is complete.
*/
class ShutdownFuture implements Future<Void> {
class ShutdownFuture {
private static final Log log = LogFactory.getLog(ShutdownFuture.class);
private final CountDownLatch shutdownCompleteLatch;
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
private final ExecutorService shutdownExecutor;
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
this(shutdownCompleteLatch, notificationCompleteLatch, worker, makeExecutor());
}
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker,
ExecutorService shutdownExecutor) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
this.worker = worker;
this.shutdownExecutor = shutdownExecutor;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel a shutdown process");
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isWorkerShutdownComplete();
private static ExecutorService makeExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RequestShutdown-%04d")
.build();
return Executors.newSingleThreadExecutor(threadFactory);
}
private boolean isWorkerShutdownComplete() {
@ -128,28 +132,26 @@ class ShutdownFuture implements Future<Void> {
return outstanding;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
boolean complete = false;
do {
try {
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
complete = outstanding == 0;
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} catch (TimeoutException te) {
log.info("Timeout while waiting for completion: " + te.getMessage());
}
} while(!complete);
return null;
Future<Void> startShutdown() {
return shutdownExecutor.submit(new ShutdownCallable());
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
long outstanding = outstandingRecordProcessors(timeout, unit);
if (outstanding != 0) {
throw new TimeoutException("Awaiting " + outstanding + " record processors to shutdown.");
private class ShutdownCallable implements Callable<Void> {
@Override
public Void call() throws Exception {
boolean complete = false;
do {
try {
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
complete = outstanding == 0;
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} catch (TimeoutException te) {
log.info("Timeout while waiting for completion: " + te.getMessage());
}
} while (!complete);
return null;
}
return null;
}
}

View file

@ -18,10 +18,12 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -572,8 +574,7 @@ public class Worker implements Runnable {
}
}
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this).startShutdown();
}
boolean isShutdownComplete() {

View file

@ -9,11 +9,18 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -33,24 +40,29 @@ public class ShutdownFutureTest {
private Worker worker;
@Mock
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoConsumerMap;
@Mock
private ExecutorService executorService;
@Test
public void testSimpleGetAlreadyCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
mockNotificationComplete(true);
mockShutdownComplete(true);
Future<Void> future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker, executorService).startShutdown();
future.get();
verify(notificationCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(worker).shutdown();
verify(shutdownCompleteLatch).await(anyLong(), any(TimeUnit.class));
verify(executorService.shutdownNow());
}
@Test
public void testNotificationNotCompleted() throws Exception {
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker);
ShutdownFuture future = new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, worker, executorService);
mockNotificationComplete(false, true);
mockShutdownComplete(true);
@ -212,7 +224,7 @@ public class ShutdownFutureTest {
assertThat("Expected a timeout exception to occur", gotTimeout);
}
private void awaitFuture(ShutdownFuture future) throws Exception {
private void awaitFuture(Future<Void> future) throws Exception {
future.get(1, TimeUnit.SECONDS);
}
@ -233,4 +245,14 @@ public class ShutdownFutureTest {
when(latch.getCount()).thenReturn(remaining, additionalRemaining);
}
private void mockExecutor() {
when(executorService.submit(any(Callable.class))).thenAnswer(new Answer<Future<Void>>() {
@Override
public Future<Void> answer(InvocationOnMock invocation) throws Throwable {
Callable<Void> callable = (Callable<Void>)invocation.getArgumentAt(0, Callable.class);
return Futures.immediateFuture(callable.call());
}
})
}
}