From 9338cce2c153d6750ec2a93160ea2d35637de509 Mon Sep 17 00:00:00 2001 From: gguptp Date: Fri, 30 May 2025 12:06:43 +0530 Subject: [PATCH] Having a way to control sleep time in recordsfetcher --- .../retrieval/RecordsFetcherFactory.java | 5 +- .../polling/KinesisSleepTimeController.java | 39 ++++ .../retrieval/polling/PollingConfig.java | 12 +- .../polling/PrefetchRecordsPublisher.java | 124 +++++++++-- .../polling/SimpleRecordsFetcherFactory.java | 6 +- .../polling/SleepTimeController.java | 30 +++ .../polling/SleepTimeControllerConfig.java | 24 +++ .../SynchronousBlockingRetrievalFactory.java | 8 +- .../KinesisSleepTimeControllerTest.java | 204 ++++++++++++++++++ .../polling/RecordsFetcherFactoryTest.java | 8 +- 10 files changed, 438 insertions(+), 22 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java index eb62a98e..7759b506 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java @@ -15,6 +15,7 @@ package software.amazon.kinesis.retrieval; import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.retrieval.polling.SleepTimeController; /** * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. @@ -27,6 +28,7 @@ public interface RecordsFetcherFactory { * @param shardId ShardId of the shard that the fetcher will retrieve records for * @param metricsFactory MetricsFactory used to create metricScope * @param maxRecords Max number of records to be returned in a single get call + * @param sleepTimeController A controller to control the sleep time between get calls. * * @return RecordsPublisher used to get records from Kinesis. */ @@ -34,7 +36,8 @@ public interface RecordsFetcherFactory { GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, MetricsFactory metricsFactory, - int maxRecords); + int maxRecords, + SleepTimeController sleepTimeController); /** * Sets the maximum number of ProcessRecordsInput objects the RecordsPublisher can hold, before further requests are diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java new file mode 100644 index 00000000..8f68bc56 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval.polling; + +import java.time.Duration; +import java.time.Instant; + +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@KinesisClientInternalApi +public class KinesisSleepTimeController implements SleepTimeController { + @Override + public long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig) { + Instant lastSuccessfulCall = sleepTimeControllerConfig.lastSuccessfulCall(); + long idleMillisBetweenCalls = sleepTimeControllerConfig.idleMillisBetweenCalls(); + if (lastSuccessfulCall == null) { + return idleMillisBetweenCalls; + } + long timeSinceLastCall = + Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); + if (timeSinceLastCall < idleMillisBetweenCalls) { + return idleMillisBetweenCalls - timeSinceLastCall; + } + return 0; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java index efd8b9dd..69f1a2a9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java @@ -131,6 +131,15 @@ public class PollingConfig implements RetrievalSpecificConfig { */ private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); + /** + * The SleepTimeController used to control the sleep time between getRecords calls. + * + *

+ * Default value: {@link KinesisSleepTimeController} + *

+ */ + private SleepTimeController sleepTimeController = new KinesisSleepTimeController(); + /** * @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead */ @@ -185,7 +194,8 @@ public class PollingConfig implements RetrievalSpecificConfig { recordsFetcherFactory, maxRecords(), kinesisRequestTimeout, - dataFetcherProvider); + dataFetcherProvider, + sleepTimeController); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 10a0480d..9ae54fc7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -15,7 +15,6 @@ package software.amazon.kinesis.retrieval.polling; -import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.UUID; @@ -92,6 +91,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final MetricsFactory metricsFactory; private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; + private Integer lastGetRecordsReturnedRecordsCount; + private Long lastMillisBehindLatest = null; private boolean isFirstGetCallTry = true; private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; private boolean started = false; @@ -110,6 +111,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private Instant lastEventDeliveryTime = Instant.EPOCH; private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); private final ThrottlingReporter throttlingReporter; + private final SleepTimeController sleepTimeController; @Data @Accessors(fluent = true) @@ -235,7 +237,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { @NonNull final String operation, @NonNull final String shardId, final ThrottlingReporter throttlingReporter, - final long awaitTerminationTimeoutMillis) { + final long awaitTerminationTimeoutMillis, + final SleepTimeController sleepTimeController) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -255,6 +258,51 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier(); this.streamAndShardId = this.streamId.serialize() + ":" + shardId; this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis; + this.sleepTimeController = sleepTimeController; + } + + /** + * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see PrefetchRecordsPublisher + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchRecordsPublisher( + final int maxPendingProcessRecordsInput, + final int maxByteSize, + final int maxRecordsCount, + final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final ExecutorService executorService, + final long idleMillisBetweenCalls, + final MetricsFactory metricsFactory, + final String operation, + final String shardId, + final ThrottlingReporter throttlingReporter, + final SleepTimeController sleepTimeController) { + this( + maxPendingProcessRecordsInput, + maxByteSize, + maxRecordsCount, + maxRecordsPerCall, + getRecordsRetrievalStrategy, + executorService, + idleMillisBetweenCalls, + metricsFactory, + operation, + shardId, + throttlingReporter, + DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS, + sleepTimeController); } /** @@ -296,7 +344,53 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { operation, shardId, throttlingReporter, - DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS); + DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS, + new KinesisSleepTimeController()); + } + + /** + * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see PrefetchRecordsPublisher + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + * @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService + */ + public PrefetchRecordsPublisher( + final int maxPendingProcessRecordsInput, + final int maxByteSize, + final int maxRecordsCount, + final int maxRecordsPerCall, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + @NonNull final ExecutorService executorService, + final long idleMillisBetweenCalls, + @NonNull final MetricsFactory metricsFactory, + @NonNull final String operation, + @NonNull final String shardId, + final ThrottlingReporter throttlingReporter, + final long awaitTerminationTimeoutMillis) { + this( + maxPendingProcessRecordsInput, + maxByteSize, + maxRecordsCount, + maxRecordsPerCall, + getRecordsRetrievalStrategy, + executorService, + idleMillisBetweenCalls, + metricsFactory, + operation, + shardId, + throttlingReporter, + awaitTerminationTimeoutMillis, + new KinesisSleepTimeController()); } @Override @@ -536,6 +630,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { GetRecordsResponseAdapter getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); lastSuccessfulCall = Instant.now(); + lastMillisBehindLatest = getRecordsResult.millisBehindLatest(); + lastGetRecordsReturnedRecordsCount = + getRecordsResult.records().size(); final List records = getRecordsResult.records(); ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder() @@ -626,19 +723,22 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { isFirstGetCallTry = false; return; } - // Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm - if (lastSuccessfulCall == null) { - Thread.sleep(idleMillisBetweenCalls); - return; - } - long timeSinceLastCall = - Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); - if (timeSinceLastCall < idleMillisBetweenCalls) { - Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); + + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(lastGetRecordsReturnedRecordsCount) + .lastMillisBehindLatest(lastMillisBehindLatest) + .build(); + long sleepTimeMillis = sleepTimeController.getSleepTimeMillis(sleepTimeControllerConfig); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); } // avoid immediate-retry storms lastSuccessfulCall = null; + lastGetRecordsReturnedRecordsCount = null; + lastMillisBehindLatest = null; } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java index b53f1576..3d5373c2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java @@ -41,7 +41,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, MetricsFactory metricsFactory, - int maxRecords) { + int maxRecords, + SleepTimeController sleepTimeController) { return new PrefetchRecordsPublisher( maxPendingProcessRecordsInput, @@ -59,7 +60,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { metricsFactory, "ProcessTask", shardId, - new ThrottlingReporter(maxConsecutiveThrottles, shardId)); + new ThrottlingReporter(maxConsecutiveThrottles, shardId), + sleepTimeController); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java new file mode 100644 index 00000000..3c3838d8 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval.polling; + +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@KinesisClientInternalApi +public interface SleepTimeController { + + /** + * Calculates the sleep time in milliseconds before the next GetRecords call. + * + * @param sleepTimeControllerConfig contains the last successful call time and the idle time between calls. + * @return the sleep time in milliseconds. + */ + long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java new file mode 100644 index 00000000..fcdefa48 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java @@ -0,0 +1,24 @@ +package software.amazon.kinesis.retrieval.polling; + +import java.time.Instant; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +/** + * Configuration for the {@link SleepTimeController}. + */ +@KinesisClientInternalApi +@EqualsAndHashCode +@Data +@Accessors(fluent = true) +@Builder +public class SleepTimeControllerConfig { + private Instant lastSuccessfulCall; + private long idleMillisBetweenCalls; + private Integer lastGetRecordsReturnedRecordsCount; + private Long lastMillisBehindLatest; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 509e261f..8e6a8ce1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -53,6 +53,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final Duration kinesisRequestTimeout; private final Function dataFetcherProvider; + private final SleepTimeController sleepTimeController; public SynchronousBlockingRetrievalFactory( String streamName, @@ -60,7 +61,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { RecordsFetcherFactory recordsFetcherFactory, int maxRecords, Duration kinesisRequestTimeout, - Function dataFetcherProvider) { + Function dataFetcherProvider, + SleepTimeController sleepTimeController) { this.streamName = streamName; this.kinesisClient = kinesisClient; this.recordsFetcherFactory = recordsFetcherFactory; @@ -68,6 +70,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { this.kinesisRequestTimeout = kinesisRequestTimeout; this.dataFetcherProvider = dataFetcherProvider == null ? defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider; + this.sleepTimeController = sleepTimeController; } private static Function defaultDataFetcherProvider( @@ -96,6 +99,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory), shardInfo.shardId(), metricsFactory, - maxRecords); + maxRecords, + getSleepTimeController()); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java new file mode 100644 index 00000000..a443928f --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.retrieval.polling; + +import java.time.Instant; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisSleepTimeControllerTest { + + private KinesisSleepTimeController controller; + private final long idleMillisBetweenCalls = 1000L; + private final Integer recordCount = 10; + private final Long millisBehindLatest = 5000L; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + controller = new KinesisSleepTimeController(); + } + + @Test + public void testGetSleepTimeMillisWithNullLastSuccessfulCall() { + // When lastSuccessfulCall is null, it should return the idleMillisBetweenCalls + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(null) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + assertEquals( + "When lastSuccessfulCall is null, should return idleMillisBetweenCalls", + idleMillisBetweenCalls, + sleepTime); + } + + @Test + public void testGetSleepTimeMillisWhenTimeSinceLastCallLessThanIdleTime() { + // Create a lastSuccessfulCall time that's recent (less than idleMillisBetweenCalls ago) + Instant now = Instant.now(); + Instant lastSuccessfulCall = now.minusMillis(500); // 500ms ago + + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + // Should return the remaining time to wait (idleMillisBetweenCalls - timeSinceLastCall) + assertTrue( + "Sleep time should be positive when time since last call is less than idle time", + sleepTime > 0 && sleepTime <= idleMillisBetweenCalls); + + // The exact value will vary slightly due to execution time, but should be close to 500ms + long expectedApproxSleepTime = idleMillisBetweenCalls - 500; + assertTrue( + "Sleep time should be approximately " + expectedApproxSleepTime + "ms", + Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations + } + + @Test + public void testGetSleepTimeMillisWhenTimeSinceLastCallEqualsOrExceedsIdleTime() { + // Create a lastSuccessfulCall time that's exactly idleMillisBetweenCalls ago + Instant now = Instant.now(); + Instant lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls); + + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + // Should return 0 as we've waited the full idle time + assertEquals("Sleep time should be 0 when time since last call equals idle time", 0L, sleepTime); + + // Test with time exceeding idle time + lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls + 500); + sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + assertEquals("Sleep time should be 0 when time since last call exceeds idle time", 0L, sleepTime); + } + + @Test + public void testGetSleepTimeMillisWithFutureLastSuccessfulCall() { + // Test with a lastSuccessfulCall in the future (should handle this case gracefully) + Instant now = Instant.now(); + Instant futureCall = now.plusMillis(500); // 500ms in the future + + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(futureCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + // The implementation uses Duration.abs(), so it should treat this the same as 500ms in the past + assertTrue( + "Sleep time should be positive when last call time is in the future", + sleepTime > 0 && sleepTime <= idleMillisBetweenCalls); + + long expectedApproxSleepTime = idleMillisBetweenCalls - 500; + assertTrue( + "Sleep time should be approximately " + expectedApproxSleepTime + "ms", + Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations + } + + @Test + public void testGetSleepTimeMillisWithDifferentIdleTimes() { + // Test with different idle times + Instant now = Instant.now(); + Instant lastSuccessfulCall = now.minusMillis(300); + + // Test with shorter idle time + long shorterIdleTime = 500L; + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(shorterIdleTime) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + assertEquals( + "Sleep time should be approximately 200ms with 500ms idle time and 300ms elapsed", + 200L, + sleepTime, + 100); + + // Test with longer idle time + long longerIdleTime = 2000L; + sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(longerIdleTime) + .lastGetRecordsReturnedRecordsCount(recordCount) + .lastMillisBehindLatest(millisBehindLatest) + .build(); + sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig); + assertEquals( + "Sleep time should be approximately 1700ms with 2000ms idle time and 300ms elapsed", + 1700L, + sleepTime, + 100); + } + + @Test + public void testGetSleepTimeMillisIgnoresRecordCountAndMillisBehindLatest() { + // Verify that the implementation ignores lastGetRecordsReturnedRecordsCount and millisBehindLatest parameters + Instant now = Instant.now(); + Instant lastSuccessfulCall = now.minusMillis(500); + + SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(0) + .lastMillisBehindLatest(0L) + .build(); + long sleepTime1 = controller.getSleepTimeMillis(sleepTimeControllerConfig); + sleepTimeControllerConfig = SleepTimeControllerConfig.builder() + .lastSuccessfulCall(lastSuccessfulCall) + .idleMillisBetweenCalls(idleMillisBetweenCalls) + .lastGetRecordsReturnedRecordsCount(100) + .lastMillisBehindLatest(10000L) + .build(); + long sleepTime2 = controller.getSleepTimeMillis(sleepTimeControllerConfig); + + // Both calls should return approximately the same sleep time since these parameters are ignored + assertTrue( + "Sleep time should be the same regardless of record count and millisBehindLatest", + Math.abs(sleepTime1 - sleepTime2) < 10); // Allow for minimal timing variations + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java index ad4a6046..3d0f4742 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java @@ -55,16 +55,16 @@ public class RecordsFetcherFactoryTest { @Ignore // TODO: remove test no longer holds true public void createDefaultRecordsFetcherTest() { - RecordsPublisher recordsCache = - recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1); + RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher( + getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController()); assertThat(recordsCache, instanceOf(BlockingRecordsPublisher.class)); } @Test public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.dataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - RecordsPublisher recordsCache = - recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1); + RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher( + getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController()); assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class)); } }