diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 3f76af36..60f109dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -76,10 +76,8 @@ public class ShardConsumer { * much coordination/synchronization to handle concurrent reads/updates. */ private ConsumerState currentState; - /* - * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. - * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. - */ + + private final Object shutdownLock = new Object(); @Getter(AccessLevel.PUBLIC) private volatile ShutdownReason shutdownReason; private volatile ShutdownNotification shutdownNotification; @@ -257,24 +255,27 @@ public class ShardConsumer { } @VisibleForTesting - synchronized CompletableFuture shutdownComplete() { - if (taskOutcome != null) { - updateState(taskOutcome); - } else { - // - // ShardConsumer has been asked to shutdown before the first task even had a chance to run. - // In this case generate a successful task outcome, and allow the shutdown to continue. This should only - // happen if the lease was lost before the initial state had a chance to run. - // - updateState(TaskOutcome.SUCCESSFUL); - } - if (isShutdown()) { - return CompletableFuture.completedFuture(true); - } + CompletableFuture shutdownComplete() { return CompletableFuture.supplyAsync(() -> { - executeTask(null); - return false; - }); + synchronized (this) { + if (taskOutcome != null) { + updateState(taskOutcome); + } else { + // + // ShardConsumer has been asked to shutdown before the first task even had a chance to run. + // In this case generate a successful task outcome, and allow the shutdown to continue. This should only + // happen if the lease was lost before the initial state had a chance to run. + // + updateState(TaskOutcome.SUCCESSFUL); + } + if (isShutdown()) { + return true; + } + + executeTask(null); + return false; + } + }, executorService); } private synchronized void processData(ProcessRecordsInput input) { @@ -339,10 +340,12 @@ public class ShardConsumer { } private ConsumerState handleShutdownTransition(TaskOutcome outcome, ConsumerState nextState) { - if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) { - return currentState.shutdownTransition(shutdownReason); + synchronized (shutdownLock) { + if (isShutdownRequested() && outcome != TaskOutcome.FAILURE) { + return currentState.shutdownTransition(shutdownReason); + } + return nextState; } - return nextState; } private void logTaskException(TaskResult taskResult) { @@ -388,13 +391,15 @@ public class ShardConsumer { return isShutdown(); } - synchronized void markForShutdown(ShutdownReason reason) { - // - // ShutdownReason.LEASE_LOST takes precedence over SHARD_END - // (we won't be able to save checkpoint at end of shard) - // - if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { - shutdownReason = reason; + void markForShutdown(ShutdownReason reason) { + synchronized (shutdownLock) { + // + // ShutdownReason.LEASE_LOST takes precedence over SHARD_END + // (we won't be able to save checkpoint at end of shard) + // + if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { + shutdownReason = reason; + } } } @@ -410,7 +415,9 @@ public class ShardConsumer { @VisibleForTesting public boolean isShutdownRequested() { - return shutdownReason != null; + synchronized (shutdownLock) { + return shutdownReason != null; + } } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 39c867c6..00678682 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -286,7 +286,64 @@ public class ShardConsumerTest { verifyNoMoreInteractions(taskExecutionListener); } + @Test(timeout = 1000L) + public void testLeaseLossIsNonBlocking() throws Exception { + CyclicBarrier taskCallBarrier = new CyclicBarrier(2); + CyclicBarrier processingTaskInterlock = new CyclicBarrier(2); + mockSuccessfulInitialize(null); + + mockSuccessfulProcessing(taskCallBarrier, processingTaskInterlock); + + mockSuccessfulShutdown(null); + + TestPublisher cache = new TestPublisher(); + ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis, + shardConsumerArgument, initialState, Function.identity(), 1, taskExecutionListener); + + boolean initComplete = false; + while (!initComplete) { + initComplete = consumer.initializeComplete().get(); + } + + consumer.subscribe(); + cache.awaitInitialSetup(); + + log.debug("Setup complete publishing entry"); + cache.publish(); + awaitAndResetBarrier(taskCallBarrier); + consumer.leaseLost(); + + // + // This will block if a lock is held on ShardConsumer#this + // + consumer.executeLifecycle(); + assertThat(consumer.isShutdown(), equalTo(false)); + + log.debug("Release processing task interlock"); + awaitAndResetBarrier(processingTaskInterlock); + + while(!consumer.isShutdown()) { + consumer.executeLifecycle(); + Thread.yield(); + } + + verify(cache.subscription, times(1)).request(anyLong()); + verify(cache.subscription).cancel(); + verify(processingState, times(1)).createTask(eq(shardConsumerArgument), eq(consumer), any()); + verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput); + + initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + + verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput); + verifyNoMoreInteractions(taskExecutionListener); + } @Test public void testDataArrivesAfterProcessing2() throws Exception {