diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 0b81f23d..caf939b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -423,6 +423,8 @@ class ShardConsumer { } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); + } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { + currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { currentState = currentState.successTransition(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 22d74ba8..4532175a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -903,16 +903,12 @@ public class Worker implements Runnable { lease, notificationCompleteLatch, shutdownCompleteLatch); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer != null) { - consumer.notifyShutdownRequested(shutdownNotification); - } else { - // - // There is a race condition between retrieving the current assignments, and creating the - // notification. If the a lease is lost in between these two points, we explicitly decrement the - // notification latches to clear the shutdown. - // + + if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { notificationCompleteLatch.countDown(); shutdownCompleteLatch.countDown(); + } else { + consumer.notifyShutdownRequested(shutdownNotification); } } return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);