From 4d94efac8f416a019547501fe58a1e06ccca1d0d Mon Sep 17 00:00:00 2001
From: stair <123031771+stair-aws@users.noreply.github.com>
Date: Mon, 13 Feb 2023 13:16:28 -0500
Subject: [PATCH] Optimization: 9~15% improvement in `KinesisDataFetcher`
wall-time after (#1034)
converting `AWSExceptionManger` to a static variable.
---
.../kinesis/leases/KinesisShardDetector.java | 26 +++++++++----
.../retrieval/AWSExceptionManager.java | 4 +-
.../retrieval/polling/KinesisDataFetcher.java | 22 +++++------
.../polling/KinesisDataFetcherTest.java | 37 ++++++++-----------
4 files changed, 47 insertions(+), 42 deletions(-)
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
index 189ba18b..0c3de1bd 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
@@ -64,6 +64,22 @@ import software.amazon.kinesis.retrieval.AWSExceptionManager;
@KinesisClientInternalApi
public class KinesisShardDetector implements ShardDetector {
+ /**
+ * Reusable {@link AWSExceptionManager}.
+ *
+ * N.B. This instance is mutable, but thread-safe for read-only use.
+ *
+ */
+ private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;
+
+ static {
+ AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
+ AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
+ AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
+ AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
+ AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
+ }
+
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
@@ -78,7 +94,7 @@ public class KinesisShardDetector implements ShardDetector {
private volatile Map cachedShardMap = null;
private volatile Instant lastCacheUpdateTime;
@Getter(AccessLevel.PACKAGE)
- private AtomicInteger cacheMisses = new AtomicInteger(0);
+ private final AtomicInteger cacheMisses = new AtomicInteger(0);
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
@@ -186,12 +202,6 @@ public class KinesisShardDetector implements ShardDetector {
}
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
- final AWSExceptionManager exceptionManager = new AWSExceptionManager();
- exceptionManager.add(ResourceNotFoundException.class, t -> t);
- exceptionManager.add(LimitExceededException.class, t -> t);
- exceptionManager.add(ResourceInUseException.class, t -> t);
- exceptionManager.add(KinesisException.class, t -> t);
-
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
@@ -211,7 +221,7 @@ public class KinesisShardDetector implements ShardDetector {
try {
result = getListShardsResponse(request);
} catch (ExecutionException e) {
- throw exceptionManager.apply(e.getCause());
+ throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: check if this is the correct behavior for Interrupted Exception
log.debug("Interrupted exception caught, shutdown initiated, returning null");
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java
index 65d5d0c0..8081b946 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AWSExceptionManager.java
@@ -26,7 +26,9 @@ import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
/**
- *
+ * Traverses a {@code Throwable} class inheritance in search of a mapping
+ * function which will convert that throwable into a {@code RuntimeException}.
+ * If no mapping function is found, the default function will be applied.
*/
@KinesisClientInternalApi
public class AWSExceptionManager {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
index 8d36ea8a..d17828e9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java
@@ -28,14 +28,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
-import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@@ -48,7 +46,6 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
-import software.amazon.kinesis.retrieval.DataRetrievalUtil;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@@ -66,6 +63,14 @@ public class KinesisDataFetcher implements DataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";
+ /**
+ * Reusable {@link AWSExceptionManager}.
+ *
+ * N.B. This instance is mutable, but thread-safe for read-only use.
+ *
+ */
+ private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = createExceptionManager();
+
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
@@ -91,8 +96,6 @@ public class KinesisDataFetcher implements DataFetcher {
/**
* Note: This method has package level access for testing purposes.
- *
- * @return nextIterator
*/
@Getter(AccessLevel.PACKAGE)
private String nextIterator;
@@ -233,8 +236,6 @@ public class KinesisDataFetcher implements DataFetcher {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
}
- final AWSExceptionManager exceptionManager = createExceptionManager();
-
GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request;
@@ -256,7 +257,7 @@ public class KinesisDataFetcher implements DataFetcher {
nextIterator = getNextIterator(request);
success = true;
} catch (ExecutionException e) {
- throw exceptionManager.apply(e.getCause());
+ throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
throw new RuntimeException(e);
@@ -328,7 +329,6 @@ public class KinesisDataFetcher implements DataFetcher {
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
- final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@@ -341,7 +341,7 @@ public class KinesisDataFetcher implements DataFetcher {
success = true;
return response;
} catch (ExecutionException e) {
- throw exceptionManager.apply(e.getCause());
+ throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
@@ -355,7 +355,7 @@ public class KinesisDataFetcher implements DataFetcher {
}
}
- private AWSExceptionManager createExceptionManager() {
+ private static AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
index d75f701d..2e09f34a 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -154,14 +153,13 @@ public class KinesisDataFetcherTest {
testInitializeAndFetch("foo", null, INITIAL_POSITION_LATEST);
}
- private CompletableFuture makeGetShardIteratorResonse(String shardIterator)
- throws InterruptedException, ExecutionException {
+ private CompletableFuture makeGetShardIteratorResponse(String shardIterator) {
return CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build());
}
@Test
- public void testadvanceIteratorTo() throws KinesisClientLibException, InterruptedException, ExecutionException {
+ public void testAdvanceIteratorTo() throws KinesisClientLibException {
final Checkpointer checkpoint = mock(Checkpointer.class);
final String iteratorA = "foo";
final String iteratorB = "bar";
@@ -172,8 +170,9 @@ public class KinesisDataFetcherTest {
.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA))
- .thenReturn(makeGetShardIteratorResonse(iteratorB));
+ .thenReturn(makeGetShardIteratorResponse(iteratorA))
+ .thenReturn(makeGetShardIteratorResponse(iteratorA))
+ .thenReturn(makeGetShardIteratorResponse(iteratorB));
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA));
kinesisDataFetcher.initialize(seqA, null);
@@ -203,7 +202,7 @@ public class KinesisDataFetcherTest {
}
@Test
- public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException {
+ public void testAdvanceIteratorToTrimHorizonLatestAndAtTimestamp(){
final ArgumentCaptor requestCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final String iteratorHorizon = "TRIM_HORIZON";
@@ -218,9 +217,9 @@ public class KinesisDataFetcherTest {
tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build());
when(kinesisClient.getShardIterator(requestCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(iteratorHorizon))
- .thenReturn(makeGetShardIteratorResonse(iteratorLatest))
- .thenReturn(makeGetShardIteratorResonse(iteratorAtTimestamp));
+ .thenReturn(makeGetShardIteratorResponse(iteratorHorizon))
+ .thenReturn(makeGetShardIteratorResponse(iteratorLatest))
+ .thenReturn(makeGetShardIteratorResponse(iteratorAtTimestamp));
kinesisDataFetcher.advanceIteratorTo(ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON);
assertEquals(iteratorHorizon, kinesisDataFetcher.getNextIterator());
@@ -261,7 +260,7 @@ public class KinesisDataFetcherTest {
// Set up proxy mock methods
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(nextIterator));
+ .thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
@@ -302,7 +301,6 @@ public class KinesisDataFetcherTest {
// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);
-
}
@Test
@@ -318,7 +316,7 @@ public class KinesisDataFetcherTest {
final CompletableFuture future = mock(CompletableFuture.class);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(nextIterator));
+ .thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
@@ -331,8 +329,7 @@ public class KinesisDataFetcherTest {
assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
}
- private CompletableFuture makeGetRecordsResponse(String nextIterator, List records)
- throws InterruptedException, ExecutionException {
+ private CompletableFuture makeGetRecordsResponse(String nextIterator, List records) {
List childShards = new ArrayList<>();
if(nextIterator == null) {
childShards = createChildShards();
@@ -368,7 +365,6 @@ public class KinesisDataFetcherTest {
final String initialIterator = "InitialIterator";
final String nextIterator1 = "NextIteratorOne";
final String nextIterator2 = "NextIteratorTwo";
- final String nextIterator3 = "NextIteratorThree";
final CompletableFuture nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null);
final CompletableFuture nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null);
final CompletableFuture finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2,
@@ -378,7 +374,7 @@ public class KinesisDataFetcherTest {
final CompletableFuture finalAdvancingResult = makeGetRecordsResponse(null, null);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(initialIterator));
+ .thenReturn(makeGetShardIteratorResponse(initialIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(nonAdvancingResult1, advancingResult1,
nonAdvancingResult2, advancingResult2, finalNonAdvancingResult, finalAdvancingResult);
@@ -397,8 +393,6 @@ public class KinesisDataFetcherTest {
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class));
-
-
reset(kinesisClient);
DataFetcherResult terminal = kinesisDataFetcher.getRecords();
@@ -444,7 +438,7 @@ public class KinesisDataFetcherTest {
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
- thenReturn(makeGetShardIteratorResonse(iterator));
+ thenReturn(makeGetShardIteratorResponse(iterator));
kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
@@ -547,10 +541,9 @@ public class KinesisDataFetcherTest {
} else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) {
expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build();
}
- final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(iterator);
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
- .thenReturn(makeGetShardIteratorResonse(iterator));
+ .thenReturn(makeGetShardIteratorResponse(iterator));
when(kinesisClient.getRecords(recordsCaptor.capture()))
.thenReturn(makeGetRecordsResponse(null, expectedRecords));