Having a way to control sleep time in recordsfetcher

This commit is contained in:
gguptp 2025-05-30 12:06:43 +05:30
parent 37ae2f86be
commit 9338cce2c1
10 changed files with 438 additions and 22 deletions

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.retrieval;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.polling.SleepTimeController;
/**
* This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard.
@ -27,6 +28,7 @@ public interface RecordsFetcherFactory {
* @param shardId ShardId of the shard that the fetcher will retrieve records for
* @param metricsFactory MetricsFactory used to create metricScope
* @param maxRecords Max number of records to be returned in a single get call
* @param sleepTimeController A controller to control the sleep time between get calls.
*
* @return RecordsPublisher used to get records from Kinesis.
*/
@ -34,7 +36,8 @@ public interface RecordsFetcherFactory {
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
String shardId,
MetricsFactory metricsFactory,
int maxRecords);
int maxRecords,
SleepTimeController sleepTimeController);
/**
* Sets the maximum number of ProcessRecordsInput objects the RecordsPublisher can hold, before further requests are

View file

@ -0,0 +1,39 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.time.Instant;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
public class KinesisSleepTimeController implements SleepTimeController {
@Override
public long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig) {
Instant lastSuccessfulCall = sleepTimeControllerConfig.lastSuccessfulCall();
long idleMillisBetweenCalls = sleepTimeControllerConfig.idleMillisBetweenCalls();
if (lastSuccessfulCall == null) {
return idleMillisBetweenCalls;
}
long timeSinceLastCall =
Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
return idleMillisBetweenCalls - timeSinceLastCall;
}
return 0;
}
}

View file

@ -131,6 +131,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
/**
* The SleepTimeController used to control the sleep time between getRecords calls.
*
* <p>
* Default value: {@link KinesisSleepTimeController}
* </p>
*/
private SleepTimeController sleepTimeController = new KinesisSleepTimeController();
/**
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
*/
@ -185,7 +194,8 @@ public class PollingConfig implements RetrievalSpecificConfig {
recordsFetcherFactory,
maxRecords(),
kinesisRequestTimeout,
dataFetcherProvider);
dataFetcherProvider,
sleepTimeController);
}
@Override

View file

@ -15,7 +15,6 @@
package software.amazon.kinesis.retrieval.polling;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
@ -92,6 +91,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final MetricsFactory metricsFactory;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private Integer lastGetRecordsReturnedRecordsCount;
private Long lastMillisBehindLatest = null;
private boolean isFirstGetCallTry = true;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private boolean started = false;
@ -110,6 +111,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
private final ThrottlingReporter throttlingReporter;
private final SleepTimeController sleepTimeController;
@Data
@Accessors(fluent = true)
@ -235,7 +237,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@NonNull final String operation,
@NonNull final String shardId,
final ThrottlingReporter throttlingReporter,
final long awaitTerminationTimeoutMillis) {
final long awaitTerminationTimeoutMillis,
final SleepTimeController sleepTimeController) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
@ -255,6 +258,51 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis;
this.sleepTimeController = sleepTimeController;
}
/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
*
* @see PrefetchRecordsPublisher
*
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
* blocking
* @param maxByteSize Max byte size of the queue before blocking next get records call
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
* @param maxRecordsPerCall Max records to be returned per call
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
*/
public PrefetchRecordsPublisher(
final int maxPendingProcessRecordsInput,
final int maxByteSize,
final int maxRecordsCount,
final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final ExecutorService executorService,
final long idleMillisBetweenCalls,
final MetricsFactory metricsFactory,
final String operation,
final String shardId,
final ThrottlingReporter throttlingReporter,
final SleepTimeController sleepTimeController) {
this(
maxPendingProcessRecordsInput,
maxByteSize,
maxRecordsCount,
maxRecordsPerCall,
getRecordsRetrievalStrategy,
executorService,
idleMillisBetweenCalls,
metricsFactory,
operation,
shardId,
throttlingReporter,
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
sleepTimeController);
}
/**
@ -296,7 +344,53 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
operation,
shardId,
throttlingReporter,
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS);
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
new KinesisSleepTimeController());
}
/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
*
* @see PrefetchRecordsPublisher
*
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
* blocking
* @param maxByteSize Max byte size of the queue before blocking next get records call
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
* @param maxRecordsPerCall Max records to be returned per call
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
* @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService
*/
public PrefetchRecordsPublisher(
final int maxPendingProcessRecordsInput,
final int maxByteSize,
final int maxRecordsCount,
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService,
final long idleMillisBetweenCalls,
@NonNull final MetricsFactory metricsFactory,
@NonNull final String operation,
@NonNull final String shardId,
final ThrottlingReporter throttlingReporter,
final long awaitTerminationTimeoutMillis) {
this(
maxPendingProcessRecordsInput,
maxByteSize,
maxRecordsCount,
maxRecordsPerCall,
getRecordsRetrievalStrategy,
executorService,
idleMillisBetweenCalls,
metricsFactory,
operation,
shardId,
throttlingReporter,
awaitTerminationTimeoutMillis,
new KinesisSleepTimeController());
}
@Override
@ -536,6 +630,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
GetRecordsResponseAdapter getRecordsResult =
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
lastMillisBehindLatest = getRecordsResult.millisBehindLatest();
lastGetRecordsReturnedRecordsCount =
getRecordsResult.records().size();
final List<KinesisClientRecord> records = getRecordsResult.records();
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
@ -626,19 +723,22 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
isFirstGetCallTry = false;
return;
}
// Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
if (lastSuccessfulCall == null) {
Thread.sleep(idleMillisBetweenCalls);
return;
}
long timeSinceLastCall =
Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(lastGetRecordsReturnedRecordsCount)
.lastMillisBehindLatest(lastMillisBehindLatest)
.build();
long sleepTimeMillis = sleepTimeController.getSleepTimeMillis(sleepTimeControllerConfig);
if (sleepTimeMillis > 0) {
Thread.sleep(sleepTimeMillis);
}
// avoid immediate-retry storms
lastSuccessfulCall = null;
lastGetRecordsReturnedRecordsCount = null;
lastMillisBehindLatest = null;
}
}

View file

@ -41,7 +41,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
String shardId,
MetricsFactory metricsFactory,
int maxRecords) {
int maxRecords,
SleepTimeController sleepTimeController) {
return new PrefetchRecordsPublisher(
maxPendingProcessRecordsInput,
@ -59,7 +60,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
metricsFactory,
"ProcessTask",
shardId,
new ThrottlingReporter(maxConsecutiveThrottles, shardId));
new ThrottlingReporter(maxConsecutiveThrottles, shardId),
sleepTimeController);
}
@Override

View file

@ -0,0 +1,30 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
public interface SleepTimeController {
/**
* Calculates the sleep time in milliseconds before the next GetRecords call.
*
* @param sleepTimeControllerConfig contains the last successful call time and the idle time between calls.
* @return the sleep time in milliseconds.
*/
long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig);
}

View file

@ -0,0 +1,24 @@
package software.amazon.kinesis.retrieval.polling;
import java.time.Instant;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
/**
* Configuration for the {@link SleepTimeController}.
*/
@KinesisClientInternalApi
@EqualsAndHashCode
@Data
@Accessors(fluent = true)
@Builder
public class SleepTimeControllerConfig {
private Instant lastSuccessfulCall;
private long idleMillisBetweenCalls;
private Integer lastGetRecordsReturnedRecordsCount;
private Long lastMillisBehindLatest;
}

View file

@ -53,6 +53,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
private final Duration kinesisRequestTimeout;
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
private final SleepTimeController sleepTimeController;
public SynchronousBlockingRetrievalFactory(
String streamName,
@ -60,7 +61,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout,
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider) {
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider,
SleepTimeController sleepTimeController) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
@ -68,6 +70,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
this.kinesisRequestTimeout = kinesisRequestTimeout;
this.dataFetcherProvider =
dataFetcherProvider == null ? defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
this.sleepTimeController = sleepTimeController;
}
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
@ -96,6 +99,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
shardInfo.shardId(),
metricsFactory,
maxRecords);
maxRecords,
getSleepTimeController());
}
}

View file

@ -0,0 +1,204 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval.polling;
import java.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
public class KinesisSleepTimeControllerTest {
private KinesisSleepTimeController controller;
private final long idleMillisBetweenCalls = 1000L;
private final Integer recordCount = 10;
private final Long millisBehindLatest = 5000L;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
controller = new KinesisSleepTimeController();
}
@Test
public void testGetSleepTimeMillisWithNullLastSuccessfulCall() {
// When lastSuccessfulCall is null, it should return the idleMillisBetweenCalls
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(null)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
assertEquals(
"When lastSuccessfulCall is null, should return idleMillisBetweenCalls",
idleMillisBetweenCalls,
sleepTime);
}
@Test
public void testGetSleepTimeMillisWhenTimeSinceLastCallLessThanIdleTime() {
// Create a lastSuccessfulCall time that's recent (less than idleMillisBetweenCalls ago)
Instant now = Instant.now();
Instant lastSuccessfulCall = now.minusMillis(500); // 500ms ago
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
// Should return the remaining time to wait (idleMillisBetweenCalls - timeSinceLastCall)
assertTrue(
"Sleep time should be positive when time since last call is less than idle time",
sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
// The exact value will vary slightly due to execution time, but should be close to 500ms
long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
assertTrue(
"Sleep time should be approximately " + expectedApproxSleepTime + "ms",
Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
}
@Test
public void testGetSleepTimeMillisWhenTimeSinceLastCallEqualsOrExceedsIdleTime() {
// Create a lastSuccessfulCall time that's exactly idleMillisBetweenCalls ago
Instant now = Instant.now();
Instant lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls);
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
// Should return 0 as we've waited the full idle time
assertEquals("Sleep time should be 0 when time since last call equals idle time", 0L, sleepTime);
// Test with time exceeding idle time
lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls + 500);
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
assertEquals("Sleep time should be 0 when time since last call exceeds idle time", 0L, sleepTime);
}
@Test
public void testGetSleepTimeMillisWithFutureLastSuccessfulCall() {
// Test with a lastSuccessfulCall in the future (should handle this case gracefully)
Instant now = Instant.now();
Instant futureCall = now.plusMillis(500); // 500ms in the future
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(futureCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
// The implementation uses Duration.abs(), so it should treat this the same as 500ms in the past
assertTrue(
"Sleep time should be positive when last call time is in the future",
sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
assertTrue(
"Sleep time should be approximately " + expectedApproxSleepTime + "ms",
Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
}
@Test
public void testGetSleepTimeMillisWithDifferentIdleTimes() {
// Test with different idle times
Instant now = Instant.now();
Instant lastSuccessfulCall = now.minusMillis(300);
// Test with shorter idle time
long shorterIdleTime = 500L;
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(shorterIdleTime)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
assertEquals(
"Sleep time should be approximately 200ms with 500ms idle time and 300ms elapsed",
200L,
sleepTime,
100);
// Test with longer idle time
long longerIdleTime = 2000L;
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(longerIdleTime)
.lastGetRecordsReturnedRecordsCount(recordCount)
.lastMillisBehindLatest(millisBehindLatest)
.build();
sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
assertEquals(
"Sleep time should be approximately 1700ms with 2000ms idle time and 300ms elapsed",
1700L,
sleepTime,
100);
}
@Test
public void testGetSleepTimeMillisIgnoresRecordCountAndMillisBehindLatest() {
// Verify that the implementation ignores lastGetRecordsReturnedRecordsCount and millisBehindLatest parameters
Instant now = Instant.now();
Instant lastSuccessfulCall = now.minusMillis(500);
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(0)
.lastMillisBehindLatest(0L)
.build();
long sleepTime1 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
.lastSuccessfulCall(lastSuccessfulCall)
.idleMillisBetweenCalls(idleMillisBetweenCalls)
.lastGetRecordsReturnedRecordsCount(100)
.lastMillisBehindLatest(10000L)
.build();
long sleepTime2 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
// Both calls should return approximately the same sleep time since these parameters are ignored
assertTrue(
"Sleep time should be the same regardless of record count and millisBehindLatest",
Math.abs(sleepTime1 - sleepTime2) < 10); // Allow for minimal timing variations
}
}

View file

@ -55,16 +55,16 @@ public class RecordsFetcherFactoryTest {
@Ignore
// TODO: remove test no longer holds true
public void createDefaultRecordsFetcherTest() {
RecordsPublisher recordsCache =
recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
assertThat(recordsCache, instanceOf(BlockingRecordsPublisher.class));
}
@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.dataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
RecordsPublisher recordsCache =
recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class));
}
}