handling completed and blocked tasks better during graceful shutdown

This commit is contained in:
Shitanshu Aggarwal 2019-10-07 22:39:38 +00:00
parent cc8aa5ef74
commit 9b0c72a3e2
2 changed files with 6 additions and 8 deletions

View file

@ -423,6 +423,8 @@ class ShardConsumer {
} }
if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) {
currentState = currentState.shutdownTransition(shutdownReason); currentState = currentState.shutdownTransition(shutdownReason);
} else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) {
currentState = currentState.shutdownTransition(shutdownReason);
} else if (taskOutcome == TaskOutcome.SUCCESSFUL) { } else if (taskOutcome == TaskOutcome.SUCCESSFUL) {
if (currentState.getTaskType() == currentTask.getTaskType()) { if (currentState.getTaskType() == currentTask.getTaskType()) {
currentState = currentState.successTransition(); currentState = currentState.successTransition();

View file

@ -903,16 +903,12 @@ public class Worker implements Runnable {
lease, notificationCompleteLatch, shutdownCompleteLatch); lease, notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification); if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) {
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown(); notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown(); shutdownCompleteLatch.countDown();
} else {
consumer.notifyShutdownRequested(shutdownNotification);
} }
} }
return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this); return new GracefulShutdownContext(shutdownCompleteLatch, notificationCompleteLatch, this);