From 37ae2f86be4b5e7f419c14ac35f893d04dfa0851 Mon Sep 17 00:00:00 2001 From: Abhi Gupta <112846463+gguptp@users.noreply.github.com> Date: Fri, 30 May 2025 11:57:03 +0530 Subject: [PATCH] Overriding the DataFetcher to return a custom GetRecordsResponseAdapter so that customers can have custom logic to send data to KinesisClientRecord (#1479) --- .../kinesis/retrieval/DataFetcherResult.java | 6 +- .../retrieval/GetRecordsResponseAdapter.java | 55 ++++++++++++++ .../GetRecordsRetrievalStrategy.java | 3 +- .../retrieval/KinesisClientRecord.java | 21 ++++++ .../KinesisGetRecordsResponseAdapter.java | 60 ++++++++++++++++ ...ynchronousGetRecordsRetrievalStrategy.java | 6 +- .../polling/BlockingRecordsPublisher.java | 13 ++-- .../retrieval/polling/DataFetcher.java | 40 ----------- .../retrieval/polling/KinesisDataFetcher.java | 56 ++++++++++----- .../polling/PrefetchRecordsPublisher.java | 10 ++- ...ynchronousGetRecordsRetrievalStrategy.java | 4 +- ...cordsRetrievalStrategyIntegrationTest.java | 13 ++-- ...ronousGetRecordsRetrievalStrategyTest.java | 13 ++-- .../polling/KinesisDataFetcherTest.java | 12 ++-- ...efetchRecordsPublisherIntegrationTest.java | 13 ++-- .../polling/PrefetchRecordsPublisherTest.java | 71 ++++++++++--------- 16 files changed, 262 insertions(+), 134 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsResponseAdapter.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisGetRecordsResponseAdapter.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherResult.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherResult.java index 82e9da36..074caae6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherResult.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/DataFetcherResult.java @@ -14,8 +14,6 @@ */ package software.amazon.kinesis.retrieval; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; - /** * Represents the result from the DataFetcher, and allows the receiver to accept a result */ @@ -25,7 +23,7 @@ public interface DataFetcherResult { * * @return The result of the request, this can be null if the request failed. */ - GetRecordsResponse getResult(); + GetRecordsResponseAdapter getResult(); /** * Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any @@ -33,7 +31,7 @@ public interface DataFetcherResult { * * @return the result of the request, this can be null if the request failed. */ - GetRecordsResponse accept(); + GetRecordsResponseAdapter accept(); /** * Indicates whether this result is at the end of the shard or not diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsResponseAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsResponseAdapter.java new file mode 100644 index 00000000..698a49b7 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsResponseAdapter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.util.List; + +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@KinesisClientInternalApi +public interface GetRecordsResponseAdapter { + + /** + * Returns the list of records retrieved from GetRecords. + * @return list of {@link KinesisClientRecord} + */ + List records(); + + /** + * The number of milliseconds the response is from the tip of the stream. + * @return long + */ + Long millisBehindLatest(); + + /** + * Returns the list of child shards of the shard that was retrieved from GetRecords. + * @return list of {@link ChildShard} + */ + List childShards(); + + /** + * Returns the next shard iterator to be used to retrieve next set of records. + * @return String + */ + String nextShardIterator(); + + /** + * Returns the request id of the GetRecords operation. + * @return String containing the request id + */ + String requestId(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java index ca0cfbe8..6befd066 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/GetRecordsRetrievalStrategy.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.retrieval; import java.util.Optional; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.retrieval.polling.DataFetcher; import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher; @@ -34,7 +33,7 @@ public interface GetRecordsRetrievalStrategy { * @throws IllegalStateException * if the strategy has been shutdown. */ - GetRecordsResponse getRecords(int maxRecords); + GetRecordsResponseAdapter getRecords(int maxRecords); /** * Releases any resources used by the strategy. Once the strategy is shutdown it is no longer safe to call diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java index 5e8018f9..71c63bf2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisClientRecord.java @@ -46,6 +46,27 @@ public class KinesisClientRecord { private final boolean aggregated; private final Schema schema; + protected KinesisClientRecord( + String sequenceNumber, + Instant approximateArrivalTimestamp, + ByteBuffer data, + String partitionKey, + EncryptionType encryptionType, + long subSequenceNumber, + String explicitHashKey, + boolean aggregated, + Schema schema) { + this.sequenceNumber = sequenceNumber; + this.approximateArrivalTimestamp = approximateArrivalTimestamp; + this.data = data; + this.partitionKey = partitionKey; + this.encryptionType = encryptionType; + this.subSequenceNumber = subSequenceNumber; + this.explicitHashKey = explicitHashKey; + this.aggregated = aggregated; + this.schema = schema; + } + public static KinesisClientRecord fromRecord(Record record) { return KinesisClientRecord.builder() .sequenceNumber(record.sequenceNumber()) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisGetRecordsResponseAdapter.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisGetRecordsResponseAdapter.java new file mode 100644 index 00000000..458b9bcb --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/KinesisGetRecordsResponseAdapter.java @@ -0,0 +1,60 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval; + +import java.util.List; +import java.util.stream.Collectors; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@RequiredArgsConstructor +@EqualsAndHashCode +@KinesisClientInternalApi +public class KinesisGetRecordsResponseAdapter implements GetRecordsResponseAdapter { + + private final GetRecordsResponse getRecordsResponse; + + @Override + public List records() { + return getRecordsResponse.records().stream() + .map(KinesisClientRecord::fromRecord) + .collect(Collectors.toList()); + } + + @Override + public Long millisBehindLatest() { + return getRecordsResponse.millisBehindLatest(); + } + + @Override + public List childShards() { + return getRecordsResponse.childShards(); + } + + @Override + public String nextShardIterator() { + return getRecordsResponse.nextShardIterator(); + } + + @Override + public String requestId() { + return getRecordsResponse.responseMetadata().requestId(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java index 1501bb19..92bb64d9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategy.java @@ -32,9 +32,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; /** @@ -87,11 +87,11 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie } @Override - public GetRecordsResponse getRecords(final int maxRecords) { + public GetRecordsResponseAdapter getRecords(final int maxRecords) { if (executorService.isShutdown()) { throw new IllegalStateException("Strategy has been shutdown"); } - GetRecordsResponse result = null; + GetRecordsResponseAdapter result = null; CompletionService completionService = completionServiceSupplier.get(); Set> futures = new HashSet<>(); Callable retrieverCall = createRetrieverCallable(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java index 80d4ae61..599d3a39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/BlockingRecordsPublisher.java @@ -17,14 +17,13 @@ package software.amazon.kinesis.retrieval.polling; import java.time.Instant; import java.util.List; -import java.util.stream.Collectors; import org.reactivestreams.Subscriber; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsPublisher; @@ -59,13 +58,11 @@ public class BlockingRecordsPublisher implements RecordsPublisher { } public ProcessRecordsInput getNextResult() { - GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); - final RequestDetails getRecordsRequestDetails = new RequestDetails( - getRecordsResult.responseMetadata().requestId(), Instant.now().toString()); + GetRecordsResponseAdapter getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + final RequestDetails getRecordsRequestDetails = + new RequestDetails(getRecordsResult.requestId(), Instant.now().toString()); setLastSuccessfulRequestDetails(getRecordsRequestDetails); - List records = getRecordsResult.records().stream() - .map(KinesisClientRecord::fromRecord) - .collect(Collectors.toList()); + List records = getRecordsResult.records(); return ProcessRecordsInput.builder() .records(records) .millisBehindLatest(getRecordsResult.millisBehindLatest()) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java index ac71b4c7..47eb318d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/DataFetcher.java @@ -15,13 +15,6 @@ package software.amazon.kinesis.retrieval.polling; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - -import lombok.NonNull; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.retrieval.DataFetcherResult; @@ -76,39 +69,6 @@ public interface DataFetcher { void resetIterator( String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); - /** - * Retrieves the response based on the request. - * - * @param request the current get records request used to receive a response. - * @return GetRecordsResponse response for getRecords - */ - GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception; - - /** - * Retrieves the next get records request based on the current iterator. - * - * @param nextIterator specify the iterator to get the next record request - * @return {@link GetRecordsRequest} - */ - GetRecordsRequest getGetRecordsRequest(String nextIterator); - - /** - * Gets the next iterator based on the request. - * - * @param request used to obtain the next shard iterator - * @return next iterator string - */ - String getNextIterator(GetShardIteratorRequest request) - throws ExecutionException, InterruptedException, TimeoutException; - - /** - * Gets the next set of records based on the iterator. - * - * @param nextIterator specified shard iterator for getting the next set of records - * @return {@link GetRecordsResponse} - */ - GetRecordsResponse getRecords(@NonNull String nextIterator); - /** * Get the current account and stream information. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 85260e49..e03f5d59 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -46,8 +46,10 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.IteratorBuilder; import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -163,17 +165,17 @@ public class KinesisDataFetcher implements DataFetcher { final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() { // CHECKSTYLE.ON: MemberName @Override - public GetRecordsResponse getResult() { - return GetRecordsResponse.builder() + public GetRecordsResponseAdapter getResult() { + return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .millisBehindLatest(null) .records(Collections.emptyList()) .nextShardIterator(null) .childShards(Collections.emptyList()) - .build(); + .build()); } @Override - public GetRecordsResponse accept() { + public GetRecordsResponseAdapter accept() { isShardEndReached = true; return getResult(); } @@ -187,15 +189,15 @@ public class KinesisDataFetcher implements DataFetcher { @Data class AdvancingResult implements DataFetcherResult { - final GetRecordsResponse result; + final GetRecordsResponseAdapter result; @Override - public GetRecordsResponse getResult() { + public GetRecordsResponseAdapter getResult() { return result; } @Override - public GetRecordsResponse accept() { + public GetRecordsResponseAdapter accept() { nextIterator = result.nextShardIterator(); if (result.records() != null && !result.records().isEmpty()) { lastKnownSequenceNumber = Iterables.getLast(result.records()).sequenceNumber(); @@ -331,8 +333,13 @@ public class KinesisDataFetcher implements DataFetcher { this.initialPositionInStream = initialPositionInStream; } - @Override - public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) + /** + * Retrieves the response based on the request. + * + * @param request the current get records request used to receive a response. + * @return GetRecordsResponse response for getRecords + */ + private GetRecordsResponseAdapter getGetRecordsResponse(GetRecordsRequest request) throws ExecutionException, InterruptedException, TimeoutException { final GetRecordsResponse response = FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait); @@ -342,11 +349,16 @@ public class KinesisDataFetcher implements DataFetcher { + ". childShards: " + response.childShards() + ". Will retry GetRecords with the same nextIterator."); } - return response; + return new KinesisGetRecordsResponseAdapter(response); } - @Override - public GetRecordsRequest getGetRecordsRequest(String nextIterator) { + /** + * Gets the next set of records based on the iterator. + * + * @param nextIterator specified shard iterator for getting the next set of records + * @return {@link GetRecordsResponseAdapter} + */ + private GetRecordsRequest getGetRecordsRequest(String nextIterator) { GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder() .shardIterator(nextIterator) .limit(maxRecords); @@ -354,16 +366,26 @@ public class KinesisDataFetcher implements DataFetcher { return builder.build(); } - @Override - public String getNextIterator(GetShardIteratorRequest request) + /** + * Gets the next iterator based on the request. + * + * @param request used to obtain the next shard iterator + * @return next iterator string + */ + private String getNextIterator(GetShardIteratorRequest request) throws ExecutionException, InterruptedException, TimeoutException { final GetShardIteratorResponse result = FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait); return result.shardIterator(); } - @Override - public GetRecordsResponse getRecords(@NonNull final String nextIterator) { + /** + * Gets the next set of records based on the iterator. + * + * @param nextIterator specified shard iterator for getting the next set of records + * @return {@link GetRecordsResponse} + */ + private GetRecordsResponseAdapter getRecords(@NonNull final String nextIterator) { GetRecordsRequest request = getGetRecordsRequest(nextIterator); final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); @@ -372,7 +394,7 @@ public class KinesisDataFetcher implements DataFetcher { boolean success = false; long startTime = System.currentTimeMillis(); try { - final GetRecordsResponse response = getGetRecordsResponse(request); + final GetRecordsResponseAdapter response = getGetRecordsResponse(request); success = true; return response; } catch (ExecutionException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 32d76770..10a0480d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -24,7 +24,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import lombok.AccessLevel; @@ -41,7 +40,6 @@ import org.reactivestreams.Subscription; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; import software.amazon.kinesis.annotations.KinesisClientInternalApi; @@ -55,6 +53,7 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory; import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; @@ -534,12 +533,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { if (publisherSession.prefetchCounters().shouldGetNewRecords()) { try { sleepBeforeNextCall(); - GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + GetRecordsResponseAdapter getRecordsResult = + getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); lastSuccessfulCall = Instant.now(); - final List records = getRecordsResult.records().stream() - .map(KinesisClientRecord::fromRecord) - .collect(Collectors.toList()); + final List records = getRecordsResult.records(); ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder() .records(records) .millisBehindLatest(getRecordsResult.millisBehindLatest()) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java index cdf03fac..d0ca473b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousGetRecordsRetrievalStrategy.java @@ -16,8 +16,8 @@ package software.amazon.kinesis.retrieval.polling; import lombok.Data; import lombok.NonNull; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; /** @@ -31,7 +31,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev private final DataFetcher dataFetcher; @Override - public GetRecordsResponse getRecords(final int maxRecords) { + public GetRecordsResponseAdapter getRecords(final int maxRecords) { return dataFetcher.getRecords().accept(); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 2528f158..2e8b05be 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -38,6 +38,8 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -71,7 +73,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { private KinesisAsyncClient kinesisClient; private CompletionService completionService; - private GetRecordsResponse getRecordsResponse; + private GetRecordsResponseAdapter getRecordsResponse; private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy; private KinesisDataFetcher dataFetcher; @@ -97,7 +99,8 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { completionService = spy(new ExecutorCompletionService(executorService)); getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy( dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001"); - getRecordsResponse = GetRecordsResponse.builder().build(); + getRecordsResponse = new KinesisGetRecordsResponseAdapter( + GetRecordsResponse.builder().build()); when(completionServiceSupplier.get()).thenReturn(completionService); when(result.accept()).thenReturn(getRecordsResponse); @@ -106,7 +109,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Test public void oneRequestMultithreadTest() { when(result.accept()).thenReturn(null); - GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); + GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); assertNull(getRecordsResult); @@ -117,7 +120,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService)); when(completionServiceSupplier.get()).thenReturn(completionService1); - GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); + GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); assertThat(getRecordsResult, equalTo(getRecordsResponse)); @@ -127,7 +130,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { spy(new ExecutorCompletionService(executorService)); when(completionServiceSupplier.get()).thenReturn(completionService2); getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); - assertThat(getRecordsResult, nullValue(GetRecordsResponse.class)); + assertThat(getRecordsResult, nullValue(GetRecordsResponseAdapter.class)); } @Test(expected = ExpiredIteratorException.class) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java index faa727a7..ade114a3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/AsynchronousGetRecordsRetrievalStrategyTest.java @@ -30,6 +30,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -73,11 +75,12 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Mock private DataFetcherResult dataFetcherResult; - private GetRecordsResponse expectedResponses; + private GetRecordsResponseAdapter expectedResponses; @Before public void before() { - expectedResponses = GetRecordsResponse.builder().build(); + expectedResponses = new KinesisGetRecordsResponseAdapter( + GetRecordsResponse.builder().build()); when(completionServiceSupplier.get()).thenReturn(completionService); when(dataFetcherResult.accept()).thenReturn(expectedResponses); @@ -93,7 +96,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture); when(successfulFuture.get()).thenReturn(dataFetcherResult); - GetRecordsResponse result = strategy.getRecords(10); + GetRecordsResponseAdapter result = strategy.getRecords(10); verify(executorService).isShutdown(); verify(completionService).submit(any()); @@ -116,7 +119,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { when(successfulFuture.cancel(anyBoolean())).thenReturn(false); when(blockedFuture.cancel(anyBoolean())).thenReturn(true); - GetRecordsResponse actualResults = strategy.getRecords(10); + GetRecordsResponseAdapter actualResults = strategy.getRecords(10); verify(completionService, times(2)).submit(any()); verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); @@ -156,7 +159,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { when(successfulFuture.cancel(anyBoolean())).thenReturn(false); when(blockedFuture.cancel(anyBoolean())).thenReturn(true); - GetRecordsResponse actualResult = strategy.getRecords(10); + GetRecordsResponseAdapter actualResult = strategy.getRecords(10); verify(completionService, times(3)).submit(any()); verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index 57221b61..44ad6949 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -55,7 +55,9 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.RetryableRetrievalException; import static org.hamcrest.CoreMatchers.isA; @@ -433,7 +435,7 @@ public class KinesisDataFetcherTest { assertTrue(terminal.isShardEnd()); assertNotNull(terminal.getResult()); - final GetRecordsResponse terminalResult = terminal.getResult(); + final GetRecordsResponseAdapter terminalResult = terminal.getResult(); assertNotNull(terminalResult.records()); assertEquals(0, terminalResult.records().size()); assertNull(terminalResult.nextShardIterator()); @@ -540,12 +542,13 @@ public class KinesisDataFetcherTest { private DataFetcherResult assertAdvanced( GetRecordsResponse expectedResult, String previousValue, String nextValue) { DataFetcherResult acceptResult = kinesisDataFetcher.getRecords(); - assertEquals(expectedResult, acceptResult.getResult()); + KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult); + assertEquals(expectedResultAdapter, acceptResult.getResult()); assertEquals(previousValue, kinesisDataFetcher.getNextIterator()); assertFalse(kinesisDataFetcher.isShardEndReached()); - assertEquals(expectedResult, acceptResult.accept()); + assertEquals(expectedResultAdapter, acceptResult.accept()); assertEquals(nextValue, kinesisDataFetcher.getNextIterator()); if (nextValue == null) { assertTrue(kinesisDataFetcher.isShardEndReached()); @@ -557,7 +560,8 @@ public class KinesisDataFetcherTest { private DataFetcherResult assertNoAdvance(final GetRecordsResponse expectedResult, final String previousValue) { assertEquals(previousValue, kinesisDataFetcher.getNextIterator()); DataFetcherResult noAcceptResult = kinesisDataFetcher.getRecords(); - assertEquals(expectedResult, noAcceptResult.getResult()); + KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult); + assertEquals(expectedResultAdapter, noAcceptResult.getResult()); assertEquals(previousValue, kinesisDataFetcher.getNextIterator()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 291531b0..4dda0dc1 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -46,7 +46,9 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.DataFetcherResult; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.ThrottlingReporter; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -305,11 +307,12 @@ public class PrefetchRecordsPublisherIntegrationTest { @Override public DataFetcherResult getRecords() { - GetRecordsResponse getRecordsResult = GetRecordsResponse.builder() - .records(new ArrayList<>(records)) - .nextShardIterator(nextShardIterator) - .millisBehindLatest(1000L) - .build(); + GetRecordsResponseAdapter getRecordsResult = + new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() + .records(new ArrayList<>(records)) + .nextShardIterator(nextShardIterator) + .millisBehindLatest(1000L) + .build()); return new AdvancingResult(getRecordsResult); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 50ad888b..cb72b957 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -66,8 +66,10 @@ import software.amazon.kinesis.leases.ShardObjectHelper; import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.RetryableRetrievalException; @@ -136,7 +138,7 @@ public class PrefetchRecordsPublisherTest { private ExecutorService executorService; private LinkedBlockingQueue spyQueue; private PrefetchRecordsPublisher getRecordsCache; - private GetRecordsResponse getRecordsResponse; + private GetRecordsResponseAdapter getRecordsResponse; private Record record; @Before @@ -147,11 +149,11 @@ public class PrefetchRecordsPublisherTest { getRecordsCache = createPrefetchRecordsPublisher(0L); spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); - getRecordsResponse = GetRecordsResponse.builder() + getRecordsResponse = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .records(records) .nextShardIterator(NEXT_SHARD_ITERATOR) .childShards(Collections.emptyList()) - .build(); + .build()); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse); } @@ -283,8 +285,8 @@ public class PrefetchRecordsPublisherTest { public void testGetRecordsWithInvalidResponse() { record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); - GetRecordsResponse response = - GetRecordsResponse.builder().records(records).build(); + GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter( + GetRecordsResponse.builder().records(records).build()); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response); when(dataFetcher.isShardEndReached()).thenReturn(false); @@ -319,10 +321,10 @@ public class PrefetchRecordsPublisherTest { childShards.add(leftChild); childShards.add(rightChild); - GetRecordsResponse response = GetRecordsResponse.builder() + GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .records(records) .childShards(childShards) - .build(); + .build()); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response); when(dataFetcher.isShardEndReached()).thenReturn(true); @@ -417,13 +419,13 @@ public class PrefetchRecordsPublisherTest { @Test(expected = IllegalStateException.class) public void testRequestRecordsOnSubscriptionAfterShutdown() { - GetRecordsResponse response = GetRecordsResponse.builder() + GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .records(Record.builder() .data(SdkBytes.fromByteArray(new byte[] {1, 2, 3})) .sequenceNumber("123") .build()) .nextShardIterator(NEXT_SHARD_ITERATOR) - .build(); + .build()); when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); getRecordsCache.start(sequenceNumber, initialPosition); @@ -482,11 +484,11 @@ public class PrefetchRecordsPublisherTest { @Test public void testRetryableRetrievalExceptionContinues() { - GetRecordsResponse response = GetRecordsResponse.builder() + GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .millisBehindLatest(100L) .records(Collections.emptyList()) .nextShardIterator(NEXT_SHARD_ITERATOR) - .build(); + .build()); when(getRecordsRetrievalStrategy.getRecords(anyInt())) .thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout"))) .thenReturn(response); @@ -526,13 +528,14 @@ public class PrefetchRecordsPublisherTest { // final int[] sequenceNumberInResponse = {0}; - when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenAnswer(i -> GetRecordsResponse.builder() - .records(Record.builder() - .data(SdkBytes.fromByteArray(new byte[] {1, 2, 3})) - .sequenceNumber(++sequenceNumberInResponse[0] + "") - .build()) - .nextShardIterator(NEXT_SHARD_ITERATOR) - .build()); + when(getRecordsRetrievalStrategy.getRecords(anyInt())) + .thenAnswer(i -> new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() + .records(Record.builder() + .data(SdkBytes.fromByteArray(new byte[] {1, 2, 3})) + .sequenceNumber(++sequenceNumberInResponse[0] + "") + .build()) + .nextShardIterator(NEXT_SHARD_ITERATOR) + .build())); getRecordsCache.start(sequenceNumber, initialPosition); @@ -627,13 +630,13 @@ public class PrefetchRecordsPublisherTest { // // This test is to verify that the data consumption is not stuck in the case of an failed event delivery // to the subscriber. - GetRecordsResponse response = GetRecordsResponse.builder() + GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .records(Record.builder() .data(SdkBytes.fromByteArray(new byte[] {1, 2, 3})) .sequenceNumber("123") .build()) .nextShardIterator(NEXT_SHARD_ITERATOR) - .build(); + .build()); when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); getRecordsCache.start(sequenceNumber, initialPosition); @@ -710,7 +713,7 @@ public class PrefetchRecordsPublisherTest { @Test public void testResetClearsRemainingData() { - List responses = Stream.iterate(0, i -> i + 1) + List responses = Stream.iterate(0, i -> i + 1) .limit(10) .map(i -> { Record record = Record.builder() @@ -720,10 +723,10 @@ public class PrefetchRecordsPublisherTest { .approximateArrivalTimestamp(Instant.now()) .build(); String nextIterator = "shard-iter-" + (i + 1); - return GetRecordsResponse.builder() + return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder() .records(record) .nextShardIterator(nextIterator) - .build(); + .build()); }) .collect(Collectors.toList()); @@ -778,7 +781,8 @@ public class PrefetchRecordsPublisherTest { try { // return a valid response to cause `lastSuccessfulCall` to initialize when(getRecordsRetrievalStrategy.getRecords(anyInt())) - .thenReturn(GetRecordsResponse.builder().build()); + .thenReturn(new KinesisGetRecordsResponseAdapter( + GetRecordsResponse.builder().build())); blockUntilRecordsAvailable(); } catch (RuntimeException re) { Assert.fail("first call should succeed"); @@ -803,7 +807,8 @@ public class PrefetchRecordsPublisherTest { public void testProvisionedThroughputExceededExceptionReporter() { when(getRecordsRetrievalStrategy.getRecords(anyInt())) .thenThrow(ProvisionedThroughputExceededException.builder().build()) - .thenReturn(GetRecordsResponse.builder().build()); + .thenReturn(new KinesisGetRecordsResponseAdapter( + GetRecordsResponse.builder().build())); getRecordsCache.start(sequenceNumber, initialPosition); @@ -822,20 +827,20 @@ public class PrefetchRecordsPublisherTest { return getRecordsCache.getPublisherSession().evictPublishedRecordAndUpdateDemand("shardId"); } - private static class RetrieverAnswer implements Answer { + private static class RetrieverAnswer implements Answer { - private final List responses; - private Iterator iterator; + private final List responses; + private Iterator iterator; - public RetrieverAnswer(List responses) { + public RetrieverAnswer(List responses) { this.responses = responses; this.iterator = responses.iterator(); } public void resetIteratorTo(String nextIterator) { - Iterator newIterator = responses.iterator(); + Iterator newIterator = responses.iterator(); while (newIterator.hasNext()) { - GetRecordsResponse current = newIterator.next(); + GetRecordsResponseAdapter current = newIterator.next(); if (StringUtils.equals(nextIterator, current.nextShardIterator())) { if (!newIterator.hasNext()) { iterator = responses.iterator(); @@ -849,8 +854,8 @@ public class PrefetchRecordsPublisherTest { } @Override - public GetRecordsResponse answer(InvocationOnMock invocation) { - GetRecordsResponse response = iterator.next(); + public GetRecordsResponseAdapter answer(InvocationOnMock invocation) { + GetRecordsResponseAdapter response = iterator.next(); if (!iterator.hasNext()) { iterator = responses.iterator(); }