Optimization: 9~15% improvement in KinesisDataFetcher wall-time after (#1034)

converting `AWSExceptionManger` to a static variable.
This commit is contained in:
stair 2023-02-13 13:16:28 -05:00 committed by GitHub
parent 9fb58a22bf
commit 4d94efac8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 42 deletions

View file

@ -64,6 +64,22 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager;
@KinesisClientInternalApi
public class KinesisShardDetector implements ShardDetector {
/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;
static {
AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
}
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
@ -78,7 +94,7 @@ public class KinesisShardDetector implements ShardDetector {
private volatile Map<String, Shard> cachedShardMap = null;
private volatile Instant lastCacheUpdateTime;
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
private final AtomicInteger cacheMisses = new AtomicInteger(0);
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
@ -186,12 +202,6 @@ public class KinesisShardDetector implements ShardDetector {
}
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
@ -211,7 +221,7 @@ public class KinesisShardDetector implements ShardDetector {
try {
result = getListShardsResponse(request);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: check if this is the correct behavior for Interrupted Exception
log.debug("Interrupted exception caught, shutdown initiated, returning null");

View file

@ -26,7 +26,9 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
/**
*
* Traverses a {@code Throwable} class inheritance in search of a mapping
* function which will convert that throwable into a {@code RuntimeException}.
* If no mapping function is found, the default function will be applied.
*/
@KinesisClientInternalApi
public class AWSExceptionManager {

View file

@ -28,14 +28,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -48,7 +46,6 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@ -66,6 +63,14 @@ public class KinesisDataFetcher implements DataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";
/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = createExceptionManager();
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
@ -91,8 +96,6 @@ public class KinesisDataFetcher implements DataFetcher {
/**
* Note: This method has package level access for testing purposes.
*
* @return nextIterator
*/
@Getter(AccessLevel.PACKAGE)
private String nextIterator;
@ -233,8 +236,6 @@ public class KinesisDataFetcher implements DataFetcher {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
}
final AWSExceptionManager exceptionManager = createExceptionManager();
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request;
@ -256,7 +257,7 @@ public class KinesisDataFetcher implements DataFetcher {
nextIterator = getNextIterator(request);
success = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
throw new RuntimeException(e);
@ -328,7 +329,6 @@ public class KinesisDataFetcher implements DataFetcher {
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -341,7 +341,7 @@ public class KinesisDataFetcher implements DataFetcher {
success = true;
return response;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
@ -355,7 +355,7 @@ public class KinesisDataFetcher implements DataFetcher {
}
}
private AWSExceptionManager createExceptionManager() {
private static AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);

View file

@ -22,7 +22,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@ -154,14 +153,13 @@ public class KinesisDataFetcherTest {
testInitializeAndFetch("foo", null, INITIAL_POSITION_LATEST);
}
private CompletableFuture<GetShardIteratorResponse> makeGetShardIteratorResonse(String shardIterator)
throws InterruptedException, ExecutionException {
private CompletableFuture<GetShardIteratorResponse> makeGetShardIteratorResponse(String shardIterator) {
return CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build());
}
@Test
public void testadvanceIteratorTo() throws KinesisClientLibException, InterruptedException, ExecutionException {
public void testAdvanceIteratorTo() throws KinesisClientLibException {
final Checkpointer checkpoint = mock(Checkpointer.class);
final String iteratorA = "foo";
final String iteratorB = "bar";
@ -172,8 +170,9 @@ public class KinesisDataFetcherTest {
.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA))
.thenReturn(makeGetShardIteratorResonse(iteratorB));
.thenReturn(makeGetShardIteratorResponse(iteratorA))
.thenReturn(makeGetShardIteratorResponse(iteratorA))
.thenReturn(makeGetShardIteratorResponse(iteratorB));
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA));
kinesisDataFetcher.initialize(seqA, null);
@ -203,7 +202,7 @@ public class KinesisDataFetcherTest {
}
@Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException {
public void testAdvanceIteratorToTrimHorizonLatestAndAtTimestamp(){
final ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final String iteratorHorizon = "TRIM_HORIZON";
@ -218,9 +217,9 @@ public class KinesisDataFetcherTest {
tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build());
when(kinesisClient.getShardIterator(requestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorHorizon))
.thenReturn(makeGetShardIteratorResonse(iteratorLatest))
.thenReturn(makeGetShardIteratorResonse(iteratorAtTimestamp));
.thenReturn(makeGetShardIteratorResponse(iteratorHorizon))
.thenReturn(makeGetShardIteratorResponse(iteratorLatest))
.thenReturn(makeGetShardIteratorResponse(iteratorAtTimestamp));
kinesisDataFetcher.advanceIteratorTo(ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON);
assertEquals(iteratorHorizon, kinesisDataFetcher.getNextIterator());
@ -261,7 +260,7 @@ public class KinesisDataFetcherTest {
// Set up proxy mock methods
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
.thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
@ -302,7 +301,6 @@ public class KinesisDataFetcherTest {
// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
}
@Test
@ -318,7 +316,7 @@ public class KinesisDataFetcherTest {
final CompletableFuture<GetRecordsResponse> future = mock(CompletableFuture.class);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
.thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
@ -331,8 +329,7 @@ public class KinesisDataFetcherTest {
assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
}
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records)
throws InterruptedException, ExecutionException {
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records) {
List<ChildShard> childShards = new ArrayList<>();
if(nextIterator == null) {
childShards = createChildShards();
@ -368,7 +365,6 @@ public class KinesisDataFetcherTest {
final String initialIterator = "InitialIterator";
final String nextIterator1 = "NextIteratorOne";
final String nextIterator2 = "NextIteratorTwo";
final String nextIterator3 = "NextIteratorThree";
final CompletableFuture<GetRecordsResponse> nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null);
final CompletableFuture<GetRecordsResponse> nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null);
final CompletableFuture<GetRecordsResponse> finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2,
@ -378,7 +374,7 @@ public class KinesisDataFetcherTest {
final CompletableFuture<GetRecordsResponse> finalAdvancingResult = makeGetRecordsResponse(null, null);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(initialIterator));
.thenReturn(makeGetShardIteratorResponse(initialIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(nonAdvancingResult1, advancingResult1,
nonAdvancingResult2, advancingResult2, finalNonAdvancingResult, finalAdvancingResult);
@ -397,8 +393,6 @@ public class KinesisDataFetcherTest {
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class));
reset(kinesisClient);
DataFetcherResult terminal = kinesisDataFetcher.getRecords();
@ -444,7 +438,7 @@ public class KinesisDataFetcherTest {
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
thenReturn(makeGetShardIteratorResonse(iterator));
thenReturn(makeGetShardIteratorResponse(iterator));
kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
@ -547,10 +541,9 @@ public class KinesisDataFetcherTest {
} else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) {
expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build();
}
final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(iterator);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iterator));
.thenReturn(makeGetShardIteratorResponse(iterator));
when(kinesisClient.getRecords(recordsCaptor.capture()))
.thenReturn(makeGetRecordsResponse(null, expectedRecords));