Overriding the DataFetcher to return a custom GetRecordsResponseAdapter so that customers can have custom logic to send data to KinesisClientRecord (#1479)

This commit is contained in:
Abhi Gupta 2025-05-30 11:57:03 +05:30 committed by GitHub
parent 1ce6123a78
commit 37ae2f86be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 262 additions and 134 deletions

View file

@ -14,8 +14,6 @@
*/
package software.amazon.kinesis.retrieval;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
/**
* Represents the result from the DataFetcher, and allows the receiver to accept a result
*/
@ -25,7 +23,7 @@ public interface DataFetcherResult {
*
* @return The result of the request, this can be null if the request failed.
*/
GetRecordsResponse getResult();
GetRecordsResponseAdapter getResult();
/**
* Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any
@ -33,7 +31,7 @@ public interface DataFetcherResult {
*
* @return the result of the request, this can be null if the request failed.
*/
GetRecordsResponse accept();
GetRecordsResponseAdapter accept();
/**
* Indicates whether this result is at the end of the shard or not

View file

@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.List;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
public interface GetRecordsResponseAdapter {
/**
* Returns the list of records retrieved from GetRecords.
* @return list of {@link KinesisClientRecord}
*/
List<KinesisClientRecord> records();
/**
* The number of milliseconds the response is from the tip of the stream.
* @return long
*/
Long millisBehindLatest();
/**
* Returns the list of child shards of the shard that was retrieved from GetRecords.
* @return list of {@link ChildShard}
*/
List<ChildShard> childShards();
/**
* Returns the next shard iterator to be used to retrieve next set of records.
* @return String
*/
String nextShardIterator();
/**
* Returns the request id of the GetRecords operation.
* @return String containing the request id
*/
String requestId();
}

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.retrieval;
import java.util.Optional;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;
@ -34,7 +33,7 @@ public interface GetRecordsRetrievalStrategy {
* @throws IllegalStateException
* if the strategy has been shutdown.
*/
GetRecordsResponse getRecords(int maxRecords);
GetRecordsResponseAdapter getRecords(int maxRecords);
/**
* Releases any resources used by the strategy. Once the strategy is shutdown it is no longer safe to call

View file

@ -46,6 +46,27 @@ public class KinesisClientRecord {
private final boolean aggregated;
private final Schema schema;
protected KinesisClientRecord(
String sequenceNumber,
Instant approximateArrivalTimestamp,
ByteBuffer data,
String partitionKey,
EncryptionType encryptionType,
long subSequenceNumber,
String explicitHashKey,
boolean aggregated,
Schema schema) {
this.sequenceNumber = sequenceNumber;
this.approximateArrivalTimestamp = approximateArrivalTimestamp;
this.data = data;
this.partitionKey = partitionKey;
this.encryptionType = encryptionType;
this.subSequenceNumber = subSequenceNumber;
this.explicitHashKey = explicitHashKey;
this.aggregated = aggregated;
this.schema = schema;
}
public static KinesisClientRecord fromRecord(Record record) {
return KinesisClientRecord.builder()
.sequenceNumber(record.sequenceNumber())

View file

@ -0,0 +1,60 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.List;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@RequiredArgsConstructor
@EqualsAndHashCode
@KinesisClientInternalApi
public class KinesisGetRecordsResponseAdapter implements GetRecordsResponseAdapter {
private final GetRecordsResponse getRecordsResponse;
@Override
public List<KinesisClientRecord> records() {
return getRecordsResponse.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
}
@Override
public Long millisBehindLatest() {
return getRecordsResponse.millisBehindLatest();
}
@Override
public List<ChildShard> childShards() {
return getRecordsResponse.childShards();
}
@Override
public String nextShardIterator() {
return getRecordsResponse.nextShardIterator();
}
@Override
public String requestId() {
return getRecordsResponse.responseMetadata().requestId();
}
}

View file

@ -32,9 +32,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/**
@ -87,11 +87,11 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
public GetRecordsResponseAdapter getRecords(final int maxRecords) {
if (executorService.isShutdown()) {
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResponse result = null;
GetRecordsResponseAdapter result = null;
CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
Set<Future<DataFetcherResult>> futures = new HashSet<>();
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable();

View file

@ -17,14 +17,13 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
@ -59,13 +58,11 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
}
public ProcessRecordsInput getNextResult() {
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
final RequestDetails getRecordsRequestDetails = new RequestDetails(
getRecordsResult.responseMetadata().requestId(), Instant.now().toString());
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
final RequestDetails getRecordsRequestDetails =
new RequestDetails(getRecordsResult.requestId(), Instant.now().toString());
setLastSuccessfulRequestDetails(getRecordsRequestDetails);
List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
List<KinesisClientRecord> records = getRecordsResult.records();
return ProcessRecordsInput.builder()
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())

View file

@ -15,13 +15,6 @@
package software.amazon.kinesis.retrieval.polling;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.DataFetcherResult;
@ -76,39 +69,6 @@ public interface DataFetcher {
void resetIterator(
String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception;
/**
* Retrieves the next get records request based on the current iterator.
*
* @param nextIterator specify the iterator to get the next record request
* @return {@link GetRecordsRequest}
*/
GetRecordsRequest getGetRecordsRequest(String nextIterator);
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
String getNextIterator(GetShardIteratorRequest request)
throws ExecutionException, InterruptedException, TimeoutException;
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
GetRecordsResponse getRecords(@NonNull String nextIterator);
/**
* Get the current account and stream information.
*

View file

@ -46,8 +46,10 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -163,17 +165,17 @@ public class KinesisDataFetcher implements DataFetcher {
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
// CHECKSTYLE.ON: MemberName
@Override
public GetRecordsResponse getResult() {
return GetRecordsResponse.builder()
public GetRecordsResponseAdapter getResult() {
return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.millisBehindLatest(null)
.records(Collections.emptyList())
.nextShardIterator(null)
.childShards(Collections.emptyList())
.build();
.build());
}
@Override
public GetRecordsResponse accept() {
public GetRecordsResponseAdapter accept() {
isShardEndReached = true;
return getResult();
}
@ -187,15 +189,15 @@ public class KinesisDataFetcher implements DataFetcher {
@Data
class AdvancingResult implements DataFetcherResult {
final GetRecordsResponse result;
final GetRecordsResponseAdapter result;
@Override
public GetRecordsResponse getResult() {
public GetRecordsResponseAdapter getResult() {
return result;
}
@Override
public GetRecordsResponse accept() {
public GetRecordsResponseAdapter accept() {
nextIterator = result.nextShardIterator();
if (result.records() != null && !result.records().isEmpty()) {
lastKnownSequenceNumber = Iterables.getLast(result.records()).sequenceNumber();
@ -331,8 +333,13 @@ public class KinesisDataFetcher implements DataFetcher {
this.initialPositionInStream = initialPositionInStream;
}
@Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request)
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
private GetRecordsResponseAdapter getGetRecordsResponse(GetRecordsRequest request)
throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response =
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait);
@ -342,11 +349,16 @@ public class KinesisDataFetcher implements DataFetcher {
+ ". childShards: " + response.childShards()
+ ". Will retry GetRecords with the same nextIterator.");
}
return response;
return new KinesisGetRecordsResponseAdapter(response);
}
@Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponseAdapter}
*/
private GetRecordsRequest getGetRecordsRequest(String nextIterator) {
GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(nextIterator)
.limit(maxRecords);
@ -354,16 +366,26 @@ public class KinesisDataFetcher implements DataFetcher {
return builder.build();
}
@Override
public String getNextIterator(GetShardIteratorRequest request)
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
private String getNextIterator(GetShardIteratorRequest request)
throws ExecutionException, InterruptedException, TimeoutException {
final GetShardIteratorResponse result =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
return result.shardIterator();
}
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
private GetRecordsResponseAdapter getRecords(@NonNull final String nextIterator) {
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -372,7 +394,7 @@ public class KinesisDataFetcher implements DataFetcher {
boolean success = false;
long startTime = System.currentTimeMillis();
try {
final GetRecordsResponse response = getGetRecordsResponse(request);
final GetRecordsResponseAdapter response = getGetRecordsResponse(request);
success = true;
return response;
} catch (ExecutionException e) {

View file

@ -24,7 +24,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
@ -41,7 +40,6 @@ import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -55,6 +53,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
@ -534,12 +533,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
GetRecordsResponseAdapter getRecordsResult =
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
final List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
final List<KinesisClientRecord> records = getRecordsResult.records();
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())

View file

@ -16,8 +16,8 @@ package software.amazon.kinesis.retrieval.polling;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/**
@ -31,7 +31,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
private final DataFetcher dataFetcher;
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
public GetRecordsResponseAdapter getRecords(final int maxRecords) {
return dataFetcher.getRecords().accept();
}

View file

@ -38,6 +38,8 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -71,7 +73,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private KinesisAsyncClient kinesisClient;
private CompletionService<DataFetcherResult> completionService;
private GetRecordsResponse getRecordsResponse;
private GetRecordsResponseAdapter getRecordsResponse;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
@ -97,7 +99,8 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
completionService = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(
dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
getRecordsResponse = GetRecordsResponse.builder().build();
getRecordsResponse = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build());
when(completionServiceSupplier.get()).thenReturn(completionService);
when(result.accept()).thenReturn(getRecordsResponse);
@ -106,7 +109,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test
public void oneRequestMultithreadTest() {
when(result.accept()).thenReturn(null);
GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords();
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertNull(getRecordsResult);
@ -117,7 +120,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
ExecutorCompletionService<DataFetcherResult> completionService1 =
spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords();
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertThat(getRecordsResult, equalTo(getRecordsResponse));
@ -127,7 +130,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertThat(getRecordsResult, nullValue(GetRecordsResponse.class));
assertThat(getRecordsResult, nullValue(GetRecordsResponseAdapter.class));
}
@Test(expected = ExpiredIteratorException.class)

View file

@ -30,6 +30,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -73,11 +75,12 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private DataFetcherResult dataFetcherResult;
private GetRecordsResponse expectedResponses;
private GetRecordsResponseAdapter expectedResponses;
@Before
public void before() {
expectedResponses = GetRecordsResponse.builder().build();
expectedResponses = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build());
when(completionServiceSupplier.get()).thenReturn(completionService);
when(dataFetcherResult.accept()).thenReturn(expectedResponses);
@ -93,7 +96,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
GetRecordsResponse result = strategy.getRecords(10);
GetRecordsResponseAdapter result = strategy.getRecords(10);
verify(executorService).isShutdown();
verify(completionService).submit(any());
@ -116,7 +119,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
GetRecordsResponse actualResults = strategy.getRecords(10);
GetRecordsResponseAdapter actualResults = strategy.getRecords(10);
verify(completionService, times(2)).submit(any());
verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
@ -156,7 +159,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
GetRecordsResponse actualResult = strategy.getRecords(10);
GetRecordsResponseAdapter actualResult = strategy.getRecords(10);
verify(completionService, times(3)).submit(any());
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));

View file

@ -55,7 +55,9 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import static org.hamcrest.CoreMatchers.isA;
@ -433,7 +435,7 @@ public class KinesisDataFetcherTest {
assertTrue(terminal.isShardEnd());
assertNotNull(terminal.getResult());
final GetRecordsResponse terminalResult = terminal.getResult();
final GetRecordsResponseAdapter terminalResult = terminal.getResult();
assertNotNull(terminalResult.records());
assertEquals(0, terminalResult.records().size());
assertNull(terminalResult.nextShardIterator());
@ -540,12 +542,13 @@ public class KinesisDataFetcherTest {
private DataFetcherResult assertAdvanced(
GetRecordsResponse expectedResult, String previousValue, String nextValue) {
DataFetcherResult acceptResult = kinesisDataFetcher.getRecords();
assertEquals(expectedResult, acceptResult.getResult());
KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult);
assertEquals(expectedResultAdapter, acceptResult.getResult());
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
assertFalse(kinesisDataFetcher.isShardEndReached());
assertEquals(expectedResult, acceptResult.accept());
assertEquals(expectedResultAdapter, acceptResult.accept());
assertEquals(nextValue, kinesisDataFetcher.getNextIterator());
if (nextValue == null) {
assertTrue(kinesisDataFetcher.isShardEndReached());
@ -557,7 +560,8 @@ public class KinesisDataFetcherTest {
private DataFetcherResult assertNoAdvance(final GetRecordsResponse expectedResult, final String previousValue) {
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
DataFetcherResult noAcceptResult = kinesisDataFetcher.getRecords();
assertEquals(expectedResult, noAcceptResult.getResult());
KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult);
assertEquals(expectedResultAdapter, noAcceptResult.getResult());
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());

View file

@ -46,7 +46,9 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -305,11 +307,12 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Override
public DataFetcherResult getRecords() {
GetRecordsResponse getRecordsResult = GetRecordsResponse.builder()
.records(new ArrayList<>(records))
.nextShardIterator(nextShardIterator)
.millisBehindLatest(1000L)
.build();
GetRecordsResponseAdapter getRecordsResult =
new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(new ArrayList<>(records))
.nextShardIterator(nextShardIterator)
.millisBehindLatest(1000L)
.build());
return new AdvancingResult(getRecordsResult);
}

View file

@ -66,8 +66,10 @@ import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@ -136,7 +138,7 @@ public class PrefetchRecordsPublisherTest {
private ExecutorService executorService;
private LinkedBlockingQueue<PrefetchRecordsPublisher.PrefetchRecordsRetrieved> spyQueue;
private PrefetchRecordsPublisher getRecordsCache;
private GetRecordsResponse getRecordsResponse;
private GetRecordsResponseAdapter getRecordsResponse;
private Record record;
@Before
@ -147,11 +149,11 @@ public class PrefetchRecordsPublisherTest {
getRecordsCache = createPrefetchRecordsPublisher(0L);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder()
getRecordsResponse = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(records)
.nextShardIterator(NEXT_SHARD_ITERATOR)
.childShards(Collections.emptyList())
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
}
@ -283,8 +285,8 @@ public class PrefetchRecordsPublisherTest {
public void testGetRecordsWithInvalidResponse() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();
GetRecordsResponse response =
GetRecordsResponse.builder().records(records).build();
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().records(records).build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response);
when(dataFetcher.isShardEndReached()).thenReturn(false);
@ -319,10 +321,10 @@ public class PrefetchRecordsPublisherTest {
childShards.add(leftChild);
childShards.add(rightChild);
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(records)
.childShards(childShards)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response);
when(dataFetcher.isShardEndReached()).thenReturn(true);
@ -417,13 +419,13 @@ public class PrefetchRecordsPublisherTest {
@Test(expected = IllegalStateException.class)
public void testRequestRecordsOnSubscriptionAfterShutdown() {
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber("123")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@ -482,11 +484,11 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testRetryableRetrievalExceptionContinues() {
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.millisBehindLatest(100L)
.records(Collections.emptyList())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout")))
.thenReturn(response);
@ -526,13 +528,14 @@ public class PrefetchRecordsPublisherTest {
//
final int[] sequenceNumberInResponse = {0};
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenAnswer(i -> GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber(++sequenceNumberInResponse[0] + "")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenAnswer(i -> new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber(++sequenceNumberInResponse[0] + "")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build()));
getRecordsCache.start(sequenceNumber, initialPosition);
@ -627,13 +630,13 @@ public class PrefetchRecordsPublisherTest {
//
// This test is to verify that the data consumption is not stuck in the case of an failed event delivery
// to the subscriber.
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber("123")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@ -710,7 +713,7 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testResetClearsRemainingData() {
List<GetRecordsResponse> responses = Stream.iterate(0, i -> i + 1)
List<GetRecordsResponseAdapter> responses = Stream.iterate(0, i -> i + 1)
.limit(10)
.map(i -> {
Record record = Record.builder()
@ -720,10 +723,10 @@ public class PrefetchRecordsPublisherTest {
.approximateArrivalTimestamp(Instant.now())
.build();
String nextIterator = "shard-iter-" + (i + 1);
return GetRecordsResponse.builder()
return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(record)
.nextShardIterator(nextIterator)
.build();
.build());
})
.collect(Collectors.toList());
@ -778,7 +781,8 @@ public class PrefetchRecordsPublisherTest {
try {
// return a valid response to cause `lastSuccessfulCall` to initialize
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenReturn(GetRecordsResponse.builder().build());
.thenReturn(new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build()));
blockUntilRecordsAvailable();
} catch (RuntimeException re) {
Assert.fail("first call should succeed");
@ -803,7 +807,8 @@ public class PrefetchRecordsPublisherTest {
public void testProvisionedThroughputExceededExceptionReporter() {
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(ProvisionedThroughputExceededException.builder().build())
.thenReturn(GetRecordsResponse.builder().build());
.thenReturn(new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build()));
getRecordsCache.start(sequenceNumber, initialPosition);
@ -822,20 +827,20 @@ public class PrefetchRecordsPublisherTest {
return getRecordsCache.getPublisherSession().evictPublishedRecordAndUpdateDemand("shardId");
}
private static class RetrieverAnswer implements Answer<GetRecordsResponse> {
private static class RetrieverAnswer implements Answer<GetRecordsResponseAdapter> {
private final List<GetRecordsResponse> responses;
private Iterator<GetRecordsResponse> iterator;
private final List<GetRecordsResponseAdapter> responses;
private Iterator<GetRecordsResponseAdapter> iterator;
public RetrieverAnswer(List<GetRecordsResponse> responses) {
public RetrieverAnswer(List<GetRecordsResponseAdapter> responses) {
this.responses = responses;
this.iterator = responses.iterator();
}
public void resetIteratorTo(String nextIterator) {
Iterator<GetRecordsResponse> newIterator = responses.iterator();
Iterator<GetRecordsResponseAdapter> newIterator = responses.iterator();
while (newIterator.hasNext()) {
GetRecordsResponse current = newIterator.next();
GetRecordsResponseAdapter current = newIterator.next();
if (StringUtils.equals(nextIterator, current.nextShardIterator())) {
if (!newIterator.hasNext()) {
iterator = responses.iterator();
@ -849,8 +854,8 @@ public class PrefetchRecordsPublisherTest {
}
@Override
public GetRecordsResponse answer(InvocationOnMock invocation) {
GetRecordsResponse response = iterator.next();
public GetRecordsResponseAdapter answer(InvocationOnMock invocation) {
GetRecordsResponseAdapter response = iterator.next();
if (!iterator.hasNext()) {
iterator = responses.iterator();
}