diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java
index eb62a98e..7759b506 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsFetcherFactory.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.retrieval;
import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.retrieval.polling.SleepTimeController;
/**
* This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard.
@@ -27,6 +28,7 @@ public interface RecordsFetcherFactory {
* @param shardId ShardId of the shard that the fetcher will retrieve records for
* @param metricsFactory MetricsFactory used to create metricScope
* @param maxRecords Max number of records to be returned in a single get call
+ * @param sleepTimeController A controller to control the sleep time between get calls.
*
* @return RecordsPublisher used to get records from Kinesis.
*/
@@ -34,7 +36,8 @@ public interface RecordsFetcherFactory {
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
String shardId,
MetricsFactory metricsFactory,
- int maxRecords);
+ int maxRecords,
+ SleepTimeController sleepTimeController);
/**
* Sets the maximum number of ProcessRecordsInput objects the RecordsPublisher can hold, before further requests are
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java
new file mode 100644
index 00000000..8f68bc56
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeController.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.retrieval.polling;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+
+@KinesisClientInternalApi
+public class KinesisSleepTimeController implements SleepTimeController {
+ @Override
+ public long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig) {
+ Instant lastSuccessfulCall = sleepTimeControllerConfig.lastSuccessfulCall();
+ long idleMillisBetweenCalls = sleepTimeControllerConfig.idleMillisBetweenCalls();
+ if (lastSuccessfulCall == null) {
+ return idleMillisBetweenCalls;
+ }
+ long timeSinceLastCall =
+ Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
+ if (timeSinceLastCall < idleMillisBetweenCalls) {
+ return idleMillisBetweenCalls - timeSinceLastCall;
+ }
+ return 0;
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java
index efd8b9dd..69f1a2a9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PollingConfig.java
@@ -131,6 +131,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
+ /**
+ * The SleepTimeController used to control the sleep time between getRecords calls.
+ *
+ *
+ * Default value: {@link KinesisSleepTimeController}
+ *
+ */
+ private SleepTimeController sleepTimeController = new KinesisSleepTimeController();
+
/**
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
*/
@@ -185,7 +194,8 @@ public class PollingConfig implements RetrievalSpecificConfig {
recordsFetcherFactory,
maxRecords(),
kinesisRequestTimeout,
- dataFetcherProvider);
+ dataFetcherProvider,
+ sleepTimeController);
}
@Override
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
index 10a0480d..9ae54fc7 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
@@ -15,7 +15,6 @@
package software.amazon.kinesis.retrieval.polling;
-import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
@@ -92,6 +91,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final MetricsFactory metricsFactory;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
+ private Integer lastGetRecordsReturnedRecordsCount;
+ private Long lastMillisBehindLatest = null;
private boolean isFirstGetCallTry = true;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private boolean started = false;
@@ -110,6 +111,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private Instant lastEventDeliveryTime = Instant.EPOCH;
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
private final ThrottlingReporter throttlingReporter;
+ private final SleepTimeController sleepTimeController;
@Data
@Accessors(fluent = true)
@@ -235,7 +237,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
@NonNull final String operation,
@NonNull final String shardId,
final ThrottlingReporter throttlingReporter,
- final long awaitTerminationTimeoutMillis) {
+ final long awaitTerminationTimeoutMillis,
+ final SleepTimeController sleepTimeController) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
@@ -255,6 +258,51 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis;
+ this.sleepTimeController = sleepTimeController;
+ }
+
+ /**
+ * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
+ * LinkedBlockingQueue.
+ *
+ * @see PrefetchRecordsPublisher
+ *
+ * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
+ * blocking
+ * @param maxByteSize Max byte size of the queue before blocking next get records call
+ * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
+ * @param maxRecordsPerCall Max records to be returned per call
+ * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
+ * @param executorService Executor service for the cache
+ * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
+ */
+ public PrefetchRecordsPublisher(
+ final int maxPendingProcessRecordsInput,
+ final int maxByteSize,
+ final int maxRecordsCount,
+ final int maxRecordsPerCall,
+ final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
+ final ExecutorService executorService,
+ final long idleMillisBetweenCalls,
+ final MetricsFactory metricsFactory,
+ final String operation,
+ final String shardId,
+ final ThrottlingReporter throttlingReporter,
+ final SleepTimeController sleepTimeController) {
+ this(
+ maxPendingProcessRecordsInput,
+ maxByteSize,
+ maxRecordsCount,
+ maxRecordsPerCall,
+ getRecordsRetrievalStrategy,
+ executorService,
+ idleMillisBetweenCalls,
+ metricsFactory,
+ operation,
+ shardId,
+ throttlingReporter,
+ DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
+ sleepTimeController);
}
/**
@@ -296,7 +344,53 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
operation,
shardId,
throttlingReporter,
- DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS);
+ DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
+ new KinesisSleepTimeController());
+ }
+
+ /**
+ * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
+ * LinkedBlockingQueue.
+ *
+ * @see PrefetchRecordsPublisher
+ *
+ * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
+ * blocking
+ * @param maxByteSize Max byte size of the queue before blocking next get records call
+ * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
+ * @param maxRecordsPerCall Max records to be returned per call
+ * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
+ * @param executorService Executor service for the cache
+ * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
+ * @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService
+ */
+ public PrefetchRecordsPublisher(
+ final int maxPendingProcessRecordsInput,
+ final int maxByteSize,
+ final int maxRecordsCount,
+ final int maxRecordsPerCall,
+ @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
+ @NonNull final ExecutorService executorService,
+ final long idleMillisBetweenCalls,
+ @NonNull final MetricsFactory metricsFactory,
+ @NonNull final String operation,
+ @NonNull final String shardId,
+ final ThrottlingReporter throttlingReporter,
+ final long awaitTerminationTimeoutMillis) {
+ this(
+ maxPendingProcessRecordsInput,
+ maxByteSize,
+ maxRecordsCount,
+ maxRecordsPerCall,
+ getRecordsRetrievalStrategy,
+ executorService,
+ idleMillisBetweenCalls,
+ metricsFactory,
+ operation,
+ shardId,
+ throttlingReporter,
+ awaitTerminationTimeoutMillis,
+ new KinesisSleepTimeController());
}
@Override
@@ -536,6 +630,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
GetRecordsResponseAdapter getRecordsResult =
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
+ lastMillisBehindLatest = getRecordsResult.millisBehindLatest();
+ lastGetRecordsReturnedRecordsCount =
+ getRecordsResult.records().size();
final List records = getRecordsResult.records();
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
@@ -626,19 +723,22 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
isFirstGetCallTry = false;
return;
}
- // Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
- if (lastSuccessfulCall == null) {
- Thread.sleep(idleMillisBetweenCalls);
- return;
- }
- long timeSinceLastCall =
- Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
- if (timeSinceLastCall < idleMillisBetweenCalls) {
- Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
+
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(lastGetRecordsReturnedRecordsCount)
+ .lastMillisBehindLatest(lastMillisBehindLatest)
+ .build();
+ long sleepTimeMillis = sleepTimeController.getSleepTimeMillis(sleepTimeControllerConfig);
+ if (sleepTimeMillis > 0) {
+ Thread.sleep(sleepTimeMillis);
}
// avoid immediate-retry storms
lastSuccessfulCall = null;
+ lastGetRecordsReturnedRecordsCount = null;
+ lastMillisBehindLatest = null;
}
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java
index b53f1576..3d5373c2 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SimpleRecordsFetcherFactory.java
@@ -41,7 +41,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
String shardId,
MetricsFactory metricsFactory,
- int maxRecords) {
+ int maxRecords,
+ SleepTimeController sleepTimeController) {
return new PrefetchRecordsPublisher(
maxPendingProcessRecordsInput,
@@ -59,7 +60,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
metricsFactory,
"ProcessTask",
shardId,
- new ThrottlingReporter(maxConsecutiveThrottles, shardId));
+ new ThrottlingReporter(maxConsecutiveThrottles, shardId),
+ sleepTimeController);
}
@Override
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java
new file mode 100644
index 00000000..3c3838d8
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeController.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.retrieval.polling;
+
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+
+@KinesisClientInternalApi
+public interface SleepTimeController {
+
+ /**
+ * Calculates the sleep time in milliseconds before the next GetRecords call.
+ *
+ * @param sleepTimeControllerConfig contains the last successful call time and the idle time between calls.
+ * @return the sleep time in milliseconds.
+ */
+ long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig);
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java
new file mode 100644
index 00000000..fcdefa48
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SleepTimeControllerConfig.java
@@ -0,0 +1,24 @@
+package software.amazon.kinesis.retrieval.polling;
+
+import java.time.Instant;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+
+/**
+ * Configuration for the {@link SleepTimeController}.
+ */
+@KinesisClientInternalApi
+@EqualsAndHashCode
+@Data
+@Accessors(fluent = true)
+@Builder
+public class SleepTimeControllerConfig {
+ private Instant lastSuccessfulCall;
+ private long idleMillisBetweenCalls;
+ private Integer lastGetRecordsReturnedRecordsCount;
+ private Long lastMillisBehindLatest;
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
index 509e261f..8e6a8ce1 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
@@ -53,6 +53,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
private final Duration kinesisRequestTimeout;
private final Function dataFetcherProvider;
+ private final SleepTimeController sleepTimeController;
public SynchronousBlockingRetrievalFactory(
String streamName,
@@ -60,7 +61,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
RecordsFetcherFactory recordsFetcherFactory,
int maxRecords,
Duration kinesisRequestTimeout,
- Function dataFetcherProvider) {
+ Function dataFetcherProvider,
+ SleepTimeController sleepTimeController) {
this.streamName = streamName;
this.kinesisClient = kinesisClient;
this.recordsFetcherFactory = recordsFetcherFactory;
@@ -68,6 +70,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
this.kinesisRequestTimeout = kinesisRequestTimeout;
this.dataFetcherProvider =
dataFetcherProvider == null ? defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
+ this.sleepTimeController = sleepTimeController;
}
private static Function defaultDataFetcherProvider(
@@ -96,6 +99,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
shardInfo.shardId(),
metricsFactory,
- maxRecords);
+ maxRecords,
+ getSleepTimeController());
}
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java
new file mode 100644
index 00000000..a443928f
--- /dev/null
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisSleepTimeControllerTest.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package software.amazon.kinesis.retrieval.polling;
+
+import java.time.Instant;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisSleepTimeControllerTest {
+
+ private KinesisSleepTimeController controller;
+ private final long idleMillisBetweenCalls = 1000L;
+ private final Integer recordCount = 10;
+ private final Long millisBehindLatest = 5000L;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ controller = new KinesisSleepTimeController();
+ }
+
+ @Test
+ public void testGetSleepTimeMillisWithNullLastSuccessfulCall() {
+ // When lastSuccessfulCall is null, it should return the idleMillisBetweenCalls
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(null)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ assertEquals(
+ "When lastSuccessfulCall is null, should return idleMillisBetweenCalls",
+ idleMillisBetweenCalls,
+ sleepTime);
+ }
+
+ @Test
+ public void testGetSleepTimeMillisWhenTimeSinceLastCallLessThanIdleTime() {
+ // Create a lastSuccessfulCall time that's recent (less than idleMillisBetweenCalls ago)
+ Instant now = Instant.now();
+ Instant lastSuccessfulCall = now.minusMillis(500); // 500ms ago
+
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ // Should return the remaining time to wait (idleMillisBetweenCalls - timeSinceLastCall)
+ assertTrue(
+ "Sleep time should be positive when time since last call is less than idle time",
+ sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
+
+ // The exact value will vary slightly due to execution time, but should be close to 500ms
+ long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
+ assertTrue(
+ "Sleep time should be approximately " + expectedApproxSleepTime + "ms",
+ Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
+ }
+
+ @Test
+ public void testGetSleepTimeMillisWhenTimeSinceLastCallEqualsOrExceedsIdleTime() {
+ // Create a lastSuccessfulCall time that's exactly idleMillisBetweenCalls ago
+ Instant now = Instant.now();
+ Instant lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls);
+
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ // Should return 0 as we've waited the full idle time
+ assertEquals("Sleep time should be 0 when time since last call equals idle time", 0L, sleepTime);
+
+ // Test with time exceeding idle time
+ lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls + 500);
+ sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ assertEquals("Sleep time should be 0 when time since last call exceeds idle time", 0L, sleepTime);
+ }
+
+ @Test
+ public void testGetSleepTimeMillisWithFutureLastSuccessfulCall() {
+ // Test with a lastSuccessfulCall in the future (should handle this case gracefully)
+ Instant now = Instant.now();
+ Instant futureCall = now.plusMillis(500); // 500ms in the future
+
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(futureCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ // The implementation uses Duration.abs(), so it should treat this the same as 500ms in the past
+ assertTrue(
+ "Sleep time should be positive when last call time is in the future",
+ sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
+
+ long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
+ assertTrue(
+ "Sleep time should be approximately " + expectedApproxSleepTime + "ms",
+ Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
+ }
+
+ @Test
+ public void testGetSleepTimeMillisWithDifferentIdleTimes() {
+ // Test with different idle times
+ Instant now = Instant.now();
+ Instant lastSuccessfulCall = now.minusMillis(300);
+
+ // Test with shorter idle time
+ long shorterIdleTime = 500L;
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(shorterIdleTime)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+ assertEquals(
+ "Sleep time should be approximately 200ms with 500ms idle time and 300ms elapsed",
+ 200L,
+ sleepTime,
+ 100);
+
+ // Test with longer idle time
+ long longerIdleTime = 2000L;
+ sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(longerIdleTime)
+ .lastGetRecordsReturnedRecordsCount(recordCount)
+ .lastMillisBehindLatest(millisBehindLatest)
+ .build();
+ sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+ assertEquals(
+ "Sleep time should be approximately 1700ms with 2000ms idle time and 300ms elapsed",
+ 1700L,
+ sleepTime,
+ 100);
+ }
+
+ @Test
+ public void testGetSleepTimeMillisIgnoresRecordCountAndMillisBehindLatest() {
+ // Verify that the implementation ignores lastGetRecordsReturnedRecordsCount and millisBehindLatest parameters
+ Instant now = Instant.now();
+ Instant lastSuccessfulCall = now.minusMillis(500);
+
+ SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(0)
+ .lastMillisBehindLatest(0L)
+ .build();
+ long sleepTime1 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+ sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
+ .lastSuccessfulCall(lastSuccessfulCall)
+ .idleMillisBetweenCalls(idleMillisBetweenCalls)
+ .lastGetRecordsReturnedRecordsCount(100)
+ .lastMillisBehindLatest(10000L)
+ .build();
+ long sleepTime2 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
+
+ // Both calls should return approximately the same sleep time since these parameters are ignored
+ assertTrue(
+ "Sleep time should be the same regardless of record count and millisBehindLatest",
+ Math.abs(sleepTime1 - sleepTime2) < 10); // Allow for minimal timing variations
+ }
+}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java
index ad4a6046..3d0f4742 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/RecordsFetcherFactoryTest.java
@@ -55,16 +55,16 @@ public class RecordsFetcherFactoryTest {
@Ignore
// TODO: remove test no longer holds true
public void createDefaultRecordsFetcherTest() {
- RecordsPublisher recordsCache =
- recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
+ RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
+ getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
assertThat(recordsCache, instanceOf(BlockingRecordsPublisher.class));
}
@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.dataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
- RecordsPublisher recordsCache =
- recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
+ RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
+ getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class));
}
}