diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 21d0599b..474a5767 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -79,7 +80,15 @@ class InitializeTask implements ITask { try { LOG.debug("Initializing ShardId " + shardInfo.getShardId()); - Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + Checkpoint initialCheckpointObject; + try { + initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + } catch (KinesisClientLibException e) { + LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e); + final TaskResult result = new TaskResult(e); + result.leaseNotFound(); + return result; + } ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 9e73aad9..11ee39b5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -374,7 +374,7 @@ class ShardConsumer { } private enum TaskOutcome { - SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND } private TaskOutcome determineTaskOutcome() { @@ -391,6 +391,10 @@ class ShardConsumer { return TaskOutcome.SUCCESSFUL; } logTaskException(result); + // This is the case of result with exception + if (result.isLeaseNotFound()) { + return TaskOutcome.LEASE_NOT_FOUND; + } } catch (Exception e) { throw new RuntimeException(e); } finally { @@ -487,6 +491,10 @@ class ShardConsumer { markForShutdown(ShutdownReason.TERMINATE); LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason TERMINATE"); } + if (taskOutcome == TaskOutcome.LEASE_NOT_FOUND) { + markForShutdown(ShutdownReason.ZOMBIE); + LOG.info("Shard " + shardInfo.getShardId() + ": Mark for shutdown with reason ZOMBIE as lease was not found"); + } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java index 70109b86..db22c97b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java @@ -33,6 +33,8 @@ class TaskResult { // List of childShards of the current shard. This field is only required for the task result when we reach end of a shard. private List childShards; + private boolean leaseNotFound; + /** * @return the shardEndReached */ @@ -57,6 +59,14 @@ class TaskResult { */ protected void setChildShards(List childShards) { this.childShards = childShards; } + public boolean isLeaseNotFound() { + return leaseNotFound; + } + + public void leaseNotFound() { + this.leaseNotFound = true; + } + /** * @return the exception */