diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 9eb23c85..f5ef482e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -39,6 +40,7 @@ import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; @@ -179,6 +181,7 @@ public class KinesisShardDetector implements ShardDetector { private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); + exceptionManager.add(ResourceNotFoundException.class, t -> t); exceptionManager.add(LimitExceededException.class, t -> t); exceptionManager.add(ResourceInUseException.class, t -> t); exceptionManager.add(KinesisException.class, t -> t); @@ -194,7 +197,6 @@ public class KinesisShardDetector implements ShardDetector { int remainingRetries = maxListShardsRetryAttempts; while (result == null) { - try { try { result = getListShardsResponse(request.build()); @@ -218,6 +220,12 @@ public class KinesisShardDetector implements ShardDetector { log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie); } lastException = e; + } catch (ResourceNotFoundException e) { + log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.", + streamIdentifier.streamName()); + return ListShardsResponse.builder().shards(Collections.emptyList()) + .nextToken(null) + .build(); } catch (TimeoutException te) { throw new RuntimeException(te); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index 1a37f614..68bb7d97 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -15,18 +15,6 @@ package software.amazon.kinesis.leases; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.isA; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -35,7 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; - +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -43,7 +31,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; - import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; @@ -51,6 +38,16 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * @@ -143,19 +140,17 @@ public class KinesisShardDetectorTest { } } - @Test(expected = ResourceNotFoundException.class) - public void testListShardsResourceNotFound() { + @Test + public void testListShardsResourceNotFoundReturnsEmptyResponse() { final CompletableFuture future = CompletableFuture.supplyAsync(() -> { throw ResourceNotFoundException.builder().build(); }); - when(client.listShards(any(ListShardsRequest.class))).thenReturn(future); - try { - shardDetector.listShards(); - } finally { - verify(client).listShards(any(ListShardsRequest.class)); - } + List shards = shardDetector.listShards(); + + Assert.assertEquals(0, shards.size()); + verify(client).listShards(any(ListShardsRequest.class)); } @Test