Merge pull request #59 from Renjuju/cherry-pick-rnf-exception

[Cherry-pick from multi_stream_2] Empty lease response for ResourceNotFound exceptions (#27)
This commit is contained in:
ashwing 2020-06-16 15:44:36 -07:00 committed by GitHub
commit 6021261b55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 23 deletions

View file

@ -19,6 +19,7 @@ import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -39,6 +40,7 @@ import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
@ -179,6 +181,7 @@ public class KinesisShardDetector implements ShardDetector {
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
@ -194,7 +197,6 @@ public class KinesisShardDetector implements ShardDetector {
int remainingRetries = maxListShardsRetryAttempts;
while (result == null) {
try {
try {
result = getListShardsResponse(request.build());
@ -218,6 +220,12 @@ public class KinesisShardDetector implements ShardDetector {
log.debug("Stream {} : Sleep was interrupted ", streamIdentifier, ie);
}
lastException = e;
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
return ListShardsResponse.builder().shards(Collections.emptyList())
.nextToken(null)
.build();
} catch (TimeoutException te) {
throw new RuntimeException(te);
}

View file

@ -15,18 +15,6 @@
package software.amazon.kinesis.leases;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -35,7 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -43,7 +31,6 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@ -51,6 +38,16 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
*
@ -143,19 +140,17 @@ public class KinesisShardDetectorTest {
}
}
@Test(expected = ResourceNotFoundException.class)
public void testListShardsResourceNotFound() {
@Test
public void testListShardsResourceNotFoundReturnsEmptyResponse() {
final CompletableFuture<ListShardsResponse> future = CompletableFuture.supplyAsync(() -> {
throw ResourceNotFoundException.builder().build();
});
when(client.listShards(any(ListShardsRequest.class))).thenReturn(future);
try {
shardDetector.listShards();
} finally {
verify(client).listShards(any(ListShardsRequest.class));
}
List<Shard> shards = shardDetector.listShards();
Assert.assertEquals(0, shards.size());
verify(client).listShards(any(ListShardsRequest.class));
}
@Test