Add SdkException to exception manager of KinesisDataFetcher (#502)

The KinesisDataFetcher uses the AWSExceptionManager to translate
execution exceptions into the expected exceptions.  Currently if the
exception is unexpected the exception will be wrapped in a
RuntimeException before being returned.  We depend on SdkExceptions
being the right type where we handle them upstream so we add a
configuration for SdkException which should ensure handling works as
expected.
This commit is contained in:
Justin Pfifer 2019-02-18 15:23:56 -08:00 committed by Sahil Palvia
parent c053789409
commit 61f54eb64e
2 changed files with 37 additions and 4 deletions

View file

@ -27,6 +27,7 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -263,6 +264,7 @@ public class KinesisDataFetcher {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
exceptionManager.add(SdkException.class, t -> t);
return exceptionManager;
}
}

View file

@ -39,16 +39,15 @@ import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@ -59,6 +58,9 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
@ -86,6 +88,11 @@ public class KinesisDataFetcherTest {
@Mock
private KinesisAsyncClient kinesisClient;
@Mock
private CompletableFuture<GetRecordsResponse> getRecordsResponseFuture;
@Rule
public ExpectedException expectedExceptionRule = ExpectedException.none();
@Before
public void setup() {
@ -264,6 +271,30 @@ public class KinesisDataFetcherTest {
assertEquals(expectedRecordsRequest, recordsCaptor.getValue());
}
}
@Test
public void testGetRecordsThrowsSdkException() throws Exception {
expectedExceptionRule.expect(SdkException.class);
expectedExceptionRule.expectMessage("Test Exception");
CompletableFuture<GetShardIteratorResponse> getShardIteratorFuture = CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator("test").build());
// Set up proxy mock methods
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(getShardIteratorFuture);
when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(getRecordsResponseFuture);
when(getRecordsResponseFuture.get())
.thenThrow(new ExecutionException(SdkException.builder().message("Test Exception").build()));
// Create data fectcher and initialize it with latest type checkpoint
kinesisDataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(
kinesisDataFetcher);
// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
}
@Test
public void testNonNullGetRecords() throws InterruptedException, ExecutionException {