From c053789409586b580ad2557a0fd6d545c4360688 Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Fri, 15 Feb 2019 12:05:23 -0800 Subject: [PATCH] Use an explicit lock for shutdown instead of the general lock (#501) If the Scheduler loses its lease for a shard it will attempt to shutdown the ShardConsumer processing that shard. When shutting down the ShardConsumer acquires a lock on `this` and makes the necessary state changes. This becomes an issue if the ShardConsumer is currently processing a batch of records as processing of the records is done under the general `this` lock. When these two things combine the Scheduler can become stuck waiting on the record processing to complete. To fix this the ShardConsumer will now use a specific lock on shutdown state changes to prevent the Scheduler from becoming blocked. Allow the shutdown state change future to acquire the lock When the ShardConsumer is being shutdown we create a future for the state change originally the future needed to acquire the lock before attempting to create the future task. This changes it to acquire the lock while running on another thread, and complete the shutdown then. --- .../kinesis/lifecycle/ShardConsumer.java | 71 ++++++++++--------- .../kinesis/lifecycle/ShardConsumerTest.java | 57 +++++++++++++++ 2 files changed, 96 insertions(+), 32 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 3f76af36..60f109dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -76,10 +76,8 @@ public class ShardConsumer { * much coordination/synchronization to handle concurrent reads/updates. */ private ConsumerState currentState; - /* - * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. - * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. - */ + + private final Object shutdownLock = new Object(); @Getter(AccessLevel.PUBLIC) private volatile ShutdownReason shutdownReason; private volatile ShutdownNotification shutdownNotification; @@ -257,24 +255,27 @@ public class ShardConsumer { } @VisibleForTesting - synchronized CompletableFuture shutdownComplete() { - if (taskOutcome != null) { - updateState(taskOutcome); - } else { - // - // ShardConsumer has been asked to shutdown before the first task even had a chance to run. - // In this case generate a successful task outcome, and allow the shutdown to continue. This should only - // happen if the lease was lost before the initial state had a chance to run. - // - updateState(TaskOutcome.SUCCESSFUL); - } - if (isShutdown()) { - return CompletableFuture.completedFuture(true); - } + CompletableFuture shutdownComplete() { return CompletableFuture.supplyAsync(() -> { - executeTask(null); - return false; - }); + synchronized (this) { + if (taskOutcome != null) { + updateState(taskOutcome); + } else { + // + // ShardConsumer has been asked to shutdown before the first task even had a chance to run. + // In this case generate a successful task outcome, and allow the shutdown to continue. This should only + // happen if the lease was lost before the initial state had a chance to run. + // + updateState(TaskOutcome.SUCCESSFUL); + } + if (isShutdown()) { + return true; + } + + executeTask(null); + return false; + } + }, executorService); } private synchronized void processData(ProcessRecordsInput input) { @@ -339,10 +340,12 @@ public class ShardConsumer { } private ConsumerState handleShutdownTransition(TaskOutcome outcome, ConsumerState nextState) { - if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) { - return currentState.shutdownTransition(shutdownReason); + synchronized (shutdownLock) { + if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) { + return currentState.shutdownTransition(shutdownReason); + } + return nextState; } - return nextState; } private void logTaskException(TaskResult taskResult) { @@ -388,13 +391,15 @@ public class ShardConsumer { return isShutdown(); } - synchronized void markForShutdown(ShutdownReason reason) { - // - // ShutdownReason.LEASE_LOST takes precedence over SHARD_END - // (we won't be able to save checkpoint at end of shard) - // - if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { - shutdownReason = reason; + void markForShutdown(ShutdownReason reason) { + synchronized (shutdownLock) { + // + // ShutdownReason.LEASE_LOST takes precedence over SHARD_END + // (we won't be able to save checkpoint at end of shard) + // + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { + shutdownReason = reason; + } } } @@ -410,7 +415,9 @@ public class ShardConsumer { @VisibleForTesting public boolean isShutdownRequested() { - return shutdownReason != null; + synchronized (shutdownLock) { + return shutdownReason != null; + } } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 39c867c6..00678682 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -286,7 +286,64 @@ public class ShardConsumerTest { verifyNoMoreInteractions(taskExecutionListener); } + @Test(timeout = 1000L) + public void testLeaseLossIsNonBlocking() throws Exception { + CyclicBarrier taskCallBarrier = new CyclicBarrier(2); + CyclicBarrier processingTaskInterlock = new CyclicBarrier(2); + mockSuccessfulInitialize(null); + + mockSuccessfulProcessing(taskCallBarrier, processingTaskInterlock); + + mockSuccessfulShutdown(null); + + TestPublisher cache = new TestPublisher(); + ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis, + shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener); + + boolean initComplete = false; + while (!initComplete) { + initComplete = consumer.initializeComplete().get(); + } + + consumer.subscribe(); + cache.awaitInitialSetup(); + + log.debug("Setup complete publishing entry"); + cache.publish(); + awaitAndResetBarrier(taskCallBarrier); + consumer.leaseLost(); + + // + // This will block if a lock is held on ShardConsumer#this + // + consumer.executeLifecycle(); + assertThat(consumer.isShutdown(), equalTo(false)); + + log.debug("Release processing task interlock"); + awaitAndResetBarrier(processingTaskInterlock); + + while(!consumer.isShutdown()) { + consumer.executeLifecycle(); + Thread.yield(); + } + + verify(cache.subscription, times(1)).request(anyLong()); + verify(cache.subscription).cancel(); + verify(processingState, times(1)).createTask(eq(shardConsumerArgument), eq(consumer), any()); + verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput); + + initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + + verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput); + verifyNoMoreInteractions(taskExecutionListener); + } @Test public void testDataArrivesAfterProcessing2() throws Exception {