Handle Possible Race Conditions, and Comments

Handle some possible race conditions during the shutdown process.

Added more comments clarifying the how shutdown works, and the race
conditions it can face.
This commit is contained in:
Pfifer, Justin 2016-10-21 09:33:20 -07:00
parent 33655bdedb
commit 524717538e
5 changed files with 114 additions and 37 deletions

View file

@ -15,8 +15,6 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@ -39,12 +37,8 @@ import com.google.common.annotations.VisibleForTesting;
*/
class ShardConsumer {
private static final Log LOG = LogFactory.getLog(ShardConsumer.class);
private static final Set<ConsumerStates.ShardConsumerState> EMPTY_DISALLOWED_SET = Collections.emptySet();
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;

View file

@ -47,6 +47,10 @@ class ShardConsumerShutdownNotification implements ShutdownNotification {
if (notificationComplete) {
return;
}
//
// Once the notification has been completed, the lease needs to dropped to allow the worker to complete
// shutdown of the record processor.
//
leaseCoordinator.dropLease(lease);
notificationCompleteLatch.countDown();
notificationComplete = true;

View file

@ -20,8 +20,6 @@ class ShutdownFuture implements Future<Void> {
private final CountDownLatch notificationCompleteLatch;
private final Worker worker;
private boolean workerShutdownCalled = false;
ShutdownFuture(CountDownLatch shutdownCompleteLatch, CountDownLatch notificationCompleteLatch, Worker worker) {
this.shutdownCompleteLatch = shutdownCompleteLatch;
this.notificationCompleteLatch = notificationCompleteLatch;
@ -48,57 +46,91 @@ class ShutdownFuture implements Future<Void> {
}
private long outstandingRecordProcessors(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
throws InterruptedException, ExecutionException, TimeoutException {
long startNanos = System.nanoTime();
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
if (!notificationCompleteLatch.await(timeout, unit)) {
long awaitingNotification = notificationCompleteLatch.getCount();
log.info("Awaiting " + awaitingNotification + " record processors to complete initial shutdown");
log.info("Awaiting " + awaitingNotification + " record processors to complete shutdown notification");
long awaitingFinalShutdown = shutdownCompleteLatch.getCount();
if (awaitingFinalShutdown != 0) {
return awaitingFinalShutdown;
//
// The number of record processor awaiting final shutdown should be a superset of the those awaiting
// notification
//
return checkWorkerShutdownMiss(awaitingFinalShutdown);
}
}
long remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Notification hasn't completed within timeout time.");
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
if (!workerShutdownCalled) {
//
// Unfortunately Worker#shutdown() doesn't appear to be idempotent.
//
worker.shutdown();
}
worker.shutdown();
remaining = remainingTimeout(timeout, unit, startNanos);
throwTimeoutMessageIfExceeded(remaining, "Shutdown hasn't completed within timeout time.");
//
// Want to wait for all the remaining ShardConsumers/RecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
if (!shutdownCompleteLatch.await(timeout, unit)) {
if (!shutdownCompleteLatch.await(remaining, TimeUnit.NANOSECONDS)) {
long outstanding = shutdownCompleteLatch.getCount();
log.info("Awaiting " + outstanding + " record processors to complete final shutdown");
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount()
+ ". shutdownComplete: " + worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
return checkWorkerShutdownMiss(outstanding);
}
return 0;
}
private long remainingTimeout(long timeout, TimeUnit unit, long startNanos) {
long checkNanos = System.nanoTime() - startNanos;
return unit.toNanos(timeout) - checkNanos;
}
private void throwTimeoutMessageIfExceeded(long remainingNanos, String message) throws TimeoutException {
if (remainingNanos <= 0) {
throw new TimeoutException(message);
}
}
private long checkWorkerShutdownMiss(long outstanding) {
if (isWorkerShutdownComplete()) {
if (outstanding != 0) {
log.warn("Shutdown completed, but shutdownCompleteLatch still had outstanding " + outstanding
+ " with a current value of " + shutdownCompleteLatch.getCount() + ". shutdownComplete: "
+ worker.isShutdownComplete() + " -- Consumer Map: "
+ worker.getShardInfoShardConsumerMap().size());
}
return 0;
}
return outstanding;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
long outstanding;
boolean complete = false;
do {
outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} while(outstanding != 0);
try {
long outstanding = outstandingRecordProcessors(1, TimeUnit.SECONDS);
complete = outstanding == 0;
log.info("Awaiting " + outstanding + " consumer(s) to finish shutdown.");
} catch (TimeoutException te) {
log.info("Timeout while waiting for completion: " + te.getMessage());
}
} while(!complete);
return null;
}

View file

@ -28,7 +28,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -43,6 +42,7 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
@ -490,9 +490,11 @@ public class Worker implements Runnable {
}
/**
* Requests shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown.
* This gives the record processor a final chance to checkpoint.
* Requests shutdown of the worker, notifying record processors, that implement {@link IShutdownNotificationAware},
* of the impending shutdown. This gives the record processor a final chance to checkpoint.
*
* <b>It's possible that a record processor won't be notify before being shutdown. This can occur if the lease is
* lost after requesting shutdown, but before the notification is dispatched.</b>
*
* <h2>Requested Shutdown Process</h2> When a shutdown process is requested it operates slightly differently to
* allow the record processors a chance to checkpoint a final time.
@ -567,6 +569,10 @@ public class Worker implements Runnable {
* </ol>
*/
public void shutdown() {
if (shutdown) {
LOG.warn("Shutdown requested a second time.");
return;
}
LOG.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process.

View file

@ -3,6 +3,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -16,7 +17,10 @@ import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ShutdownFutureTest {
@ -51,6 +55,10 @@ public class ShutdownFutureTest {
mockNotificationComplete(false, true);
mockShutdownComplete(true);
when(worker.getShardInfoShardConsumerMap()).thenReturn(shardInfoConsumerMap);
when(shardInfoConsumerMap.isEmpty()).thenReturn(false);
when(worker.isShutdownComplete()).thenReturn(false);
when(notificationCompleteLatch.getCount()).thenReturn(1L);
when(shutdownCompleteLatch.getCount()).thenReturn(1L);
@ -135,8 +143,41 @@ public class ShutdownFutureTest {
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test
public void testNotificationNotCompleteButShardConsumerEmpty() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockShutdownComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
mockOutstanding(shutdownCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(0);
awaitFuture(future);
verify(worker, never()).shutdown();
verifyLatchAwait(notificationCompleteLatch);
verify(shutdownCompleteLatch, never()).await();
verify(worker, times(2)).isShutdownComplete();
verify(worker, times(2)).getShardInfoShardConsumerMap();
verify(shardInfoConsumerMap).isEmpty();
verify(shardInfoConsumerMap).size();
}
@Test(expected = TimeoutException.class)
public void testTimeExceededException() throws Exception {
ShutdownFuture future = create();
mockNotificationComplete(false);
mockOutstanding(notificationCompleteLatch, 1L);
when(worker.isShutdownComplete()).thenReturn(false);
mockShardInfoConsumerMap(1);
future.get(1, TimeUnit.NANOSECONDS);
}
private ShutdownFuture create() {
@ -172,7 +213,7 @@ public class ShutdownFutureTest {
}
private void awaitFuture(ShutdownFuture future) throws Exception {
future.get(1, TimeUnit.MILLISECONDS);
future.get(1, TimeUnit.SECONDS);
}
private void mockNotificationComplete(Boolean initial, Boolean... states) throws Exception {