diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 21d0599b..5343470f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibNonRetryableException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -79,7 +80,15 @@ class InitializeTask implements ITask { try { LOG.debug("Initializing ShardId " + shardInfo.getShardId()); - Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + Checkpoint initialCheckpointObject; + try { + initialCheckpointObject = checkpoint.getCheckpointObject(shardInfo.getShardId()); + } catch (KinesisClientLibNonRetryableException e) { + LOG.error("Caught exception while fetching checkpoint for " + shardInfo.getShardId(), e); + final TaskResult result = new TaskResult(e); + result.leaseNotFound(); + return result; + } ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index bcbabcde..47baad04 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -283,7 +283,8 @@ class KinesisClientLibLeaseCoordinator extends LeaseCoordinator childShards; + private boolean leaseNotFound; + /** * @return the shardEndReached */ @@ -57,6 +59,14 @@ class TaskResult { */ protected void setChildShards(List childShards) { this.childShards = childShards; } + public boolean isLeaseNotFound() { + return leaseNotFound; + } + + public void leaseNotFound() { + this.leaseNotFound = true; + } + /** * @return the exception */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java index d8549870..e48cdd16 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinatorTest.java @@ -79,7 +79,7 @@ public class KinesisClientLibLeaseCoordinatorTest { leaseCoordinator.initialize(); } - @Test(expected = KinesisClientLibIOException.class) + @Test(expected = com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException.class) public void testGetCheckpointObjectWithNoLease() throws DependencyException, ProvisionedThroughputException, IllegalStateException, InvalidStateException, KinesisClientLibException { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 7afa3a8a..7a5e7fd2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -54,8 +54,6 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; -import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -64,7 +62,6 @@ import org.hamcrest.TypeSafeMatcher; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -246,6 +243,52 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } + @Test + public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws Exception { + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + + ICheckpoint checkpoint = new KinesisClientLibLeaseCoordinator(leaseManager, "", 0, 0); + + when(leaseManager.getLease(anyString())).thenReturn(null); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); + StreamConfig streamConfig = + new StreamConfig(streamProxy, + 1, + 10, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config, + shardSyncer, + shardSyncStrategy); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + consumer.consumeShard(); + Thread.sleep(50L); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + } + + @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception {