Use an explicit lock for shutdown instead of the general lock (#501)

If the Scheduler loses its lease for a shard it will attempt to
shutdown the ShardConsumer processing that shard.  When shutting down
the ShardConsumer acquires a lock on `this` and makes the necessary
state changes.

This becomes an issue if the ShardConsumer is currently processing a
batch of records as processing of the records is done under the
general `this` lock.

When these two things combine the Scheduler can become stuck waiting
on the record processing to complete.

To fix this the ShardConsumer will now use a specific lock on shutdown
state changes to prevent the Scheduler from becoming blocked.

Allow the shutdown state change future to acquire the lock

When the ShardConsumer is being shutdown we create a future for the
state change originally the future needed to acquire the lock before
attempting to create the future task.  This changes it to acquire the
lock while running on another thread, and complete the shutdown then.
This commit is contained in:
Justin Pfifer 2019-02-15 12:05:23 -08:00 committed by Sahil Palvia
parent 3b3998a59e
commit c053789409
2 changed files with 96 additions and 32 deletions

View file

@ -76,10 +76,8 @@ public class ShardConsumer {
* much coordination/synchronization to handle concurrent reads/updates.
*/
private ConsumerState currentState;
/*
* Used to track if we lost the primary responsibility. Once set to true, we will start shutting down.
* If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object.
*/
private final Object shutdownLock = new Object();
@Getter(AccessLevel.PUBLIC)
private volatile ShutdownReason shutdownReason;
private volatile ShutdownNotification shutdownNotification;
@ -257,24 +255,27 @@ public class ShardConsumer {
}
@VisibleForTesting
synchronized CompletableFuture<Boolean> shutdownComplete() {
if (taskOutcome != null) {
updateState(taskOutcome);
} else {
//
// ShardConsumer has been asked to shutdown before the first task even had a chance to run.
// In this case generate a successful task outcome, and allow the shutdown to continue. This should only
// happen if the lease was lost before the initial state had a chance to run.
//
updateState(TaskOutcome.SUCCESSFUL);
}
if (isShutdown()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> shutdownComplete() {
return CompletableFuture.supplyAsync(() -> {
executeTask(null);
return false;
});
synchronized (this) {
if (taskOutcome != null) {
updateState(taskOutcome);
} else {
//
// ShardConsumer has been asked to shutdown before the first task even had a chance to run.
// In this case generate a successful task outcome, and allow the shutdown to continue. This should only
// happen if the lease was lost before the initial state had a chance to run.
//
updateState(TaskOutcome.SUCCESSFUL);
}
if (isShutdown()) {
return true;
}
executeTask(null);
return false;
}
}, executorService);
}
private synchronized void processData(ProcessRecordsInput input) {
@ -339,10 +340,12 @@ public class ShardConsumer {
}
private ConsumerState handleShutdownTransition(TaskOutcome outcome, ConsumerState nextState) {
if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) {
return currentState.shutdownTransition(shutdownReason);
synchronized (shutdownLock) {
if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) {
return currentState.shutdownTransition(shutdownReason);
}
return nextState;
}
return nextState;
}
private void logTaskException(TaskResult taskResult) {
@ -388,13 +391,15 @@ public class ShardConsumer {
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
//
// ShutdownReason.LEASE_LOST takes precedence over SHARD_END
// (we won't be able to save checkpoint at end of shard)
//
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
void markForShutdown(ShutdownReason reason) {
synchronized (shutdownLock) {
//
// ShutdownReason.LEASE_LOST takes precedence over SHARD_END
// (we won't be able to save checkpoint at end of shard)
//
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
}
}
@ -410,7 +415,9 @@ public class ShardConsumer {
@VisibleForTesting
public boolean isShutdownRequested() {
return shutdownReason != null;
synchronized (shutdownLock) {
return shutdownReason != null;
}
}
/**

View file

@ -286,7 +286,64 @@ public class ShardConsumerTest {
verifyNoMoreInteractions(taskExecutionListener);
}
@Test(timeout = 1000L)
public void testLeaseLossIsNonBlocking() throws Exception {
CyclicBarrier taskCallBarrier = new CyclicBarrier(2);
CyclicBarrier processingTaskInterlock = new CyclicBarrier(2);
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(taskCallBarrier, processingTaskInterlock);
mockSuccessfulShutdown(null);
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener);
boolean initComplete = false;
while (!initComplete) {
initComplete = consumer.initializeComplete().get();
}
consumer.subscribe();
cache.awaitInitialSetup();
log.debug("Setup complete publishing entry");
cache.publish();
awaitAndResetBarrier(taskCallBarrier);
consumer.leaseLost();
//
// This will block if a lock is held on ShardConsumer#this
//
consumer.executeLifecycle();
assertThat(consumer.isShutdown(), equalTo(false));
log.debug("Release processing task interlock");
awaitAndResetBarrier(processingTaskInterlock);
while(!consumer.isShutdown()) {
consumer.executeLifecycle();
Thread.yield();
}
verify(cache.subscription, times(1)).request(anyLong());
verify(cache.subscription).cancel();
verify(processingState, times(1)).createTask(eq(shardConsumerArgument), eq(consumer), any());
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput);
initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput);
verifyNoMoreInteractions(taskExecutionListener);
}
@Test
public void testDataArrivesAfterProcessing2() throws Exception {