From 61f54eb64e43fab2c3c0c31c753ac4100a6e87ca Mon Sep 17 00:00:00 2001 From: Justin Pfifer Date: Mon, 18 Feb 2019 15:23:56 -0800 Subject: [PATCH] Add SdkException to exception manager of KinesisDataFetcher (#502) The KinesisDataFetcher uses the AWSExceptionManager to translate execution exceptions into the expected exceptions. Currently if the exception is unexpected the exception will be wrapped in a RuntimeException before being returned. We depend on SdkExceptions being the right type where we handle them upstream so we add a configuration for SdkException which should ensure handling works as expected. --- .../retrieval/polling/KinesisDataFetcher.java | 2 + .../polling/KinesisDataFetcherTest.java | 39 +++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 0fbb06b4..129fd15b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -27,6 +27,7 @@ import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -263,6 +264,7 @@ public class KinesisDataFetcher { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(ResourceNotFoundException.class, t -> t); exceptionManager.add(KinesisException.class, t -> t); + exceptionManager.add(SdkException.class, t -> t); return exceptionManager; } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index a0fa7063..8dfed30c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -39,16 +39,15 @@ import java.util.stream.Collectors; import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.kinesis.exceptions.KinesisClientLibException; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; - +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -59,6 +58,9 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.checkpoint.SentinelCheckpoint; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; @@ -86,6 +88,11 @@ public class KinesisDataFetcherTest { @Mock private KinesisAsyncClient kinesisClient; + @Mock + private CompletableFuture getRecordsResponseFuture; + + @Rule + public ExpectedException expectedExceptionRule = ExpectedException.none(); @Before public void setup() { @@ -264,6 +271,30 @@ public class KinesisDataFetcherTest { assertEquals(expectedRecordsRequest, recordsCaptor.getValue()); } } + + @Test + public void testGetRecordsThrowsSdkException() throws Exception { + expectedExceptionRule.expect(SdkException.class); + expectedExceptionRule.expectMessage("Test Exception"); + + CompletableFuture getShardIteratorFuture = CompletableFuture + .completedFuture(GetShardIteratorResponse.builder().shardIterator("test").build()); + + // Set up proxy mock methods + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture); + when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture); + when(getRecordsResponseFuture.get()) + .thenThrow(new ExecutionException(SdkException.builder().message("Test Exception").build())); + + // Create data fectcher and initialize it with latest type checkpoint + kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy( + kinesisDataFetcher); + + // Call records of dataFetcher which will throw an exception + getRecordsRetrievalStrategy.getRecords(MAX_RECORDS); + + } @Test public void testNonNullGetRecords() throws InterruptedException, ExecutionException {