Merge 9338cce2c1 into 37ae2f86be
This commit is contained in:
commit
c62d972f0f
10 changed files with 438 additions and 22 deletions
|
|
@ -15,6 +15,7 @@
|
|||
package software.amazon.kinesis.retrieval;
|
||||
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.polling.SleepTimeController;
|
||||
|
||||
/**
|
||||
* This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard.
|
||||
|
|
@ -27,6 +28,7 @@ public interface RecordsFetcherFactory {
|
|||
* @param shardId ShardId of the shard that the fetcher will retrieve records for
|
||||
* @param metricsFactory MetricsFactory used to create metricScope
|
||||
* @param maxRecords Max number of records to be returned in a single get call
|
||||
* @param sleepTimeController A controller to control the sleep time between get calls.
|
||||
*
|
||||
* @return RecordsPublisher used to get records from Kinesis.
|
||||
*/
|
||||
|
|
@ -34,7 +36,8 @@ public interface RecordsFetcherFactory {
|
|||
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||
String shardId,
|
||||
MetricsFactory metricsFactory,
|
||||
int maxRecords);
|
||||
int maxRecords,
|
||||
SleepTimeController sleepTimeController);
|
||||
|
||||
/**
|
||||
* Sets the maximum number of ProcessRecordsInput objects the RecordsPublisher can hold, before further requests are
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
|
||||
@KinesisClientInternalApi
|
||||
public class KinesisSleepTimeController implements SleepTimeController {
|
||||
@Override
|
||||
public long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig) {
|
||||
Instant lastSuccessfulCall = sleepTimeControllerConfig.lastSuccessfulCall();
|
||||
long idleMillisBetweenCalls = sleepTimeControllerConfig.idleMillisBetweenCalls();
|
||||
if (lastSuccessfulCall == null) {
|
||||
return idleMillisBetweenCalls;
|
||||
}
|
||||
long timeSinceLastCall =
|
||||
Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
|
||||
if (timeSinceLastCall < idleMillisBetweenCalls) {
|
||||
return idleMillisBetweenCalls - timeSinceLastCall;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
@ -131,6 +131,15 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
*/
|
||||
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
|
||||
/**
|
||||
* The SleepTimeController used to control the sleep time between getRecords calls.
|
||||
*
|
||||
* <p>
|
||||
* Default value: {@link KinesisSleepTimeController}
|
||||
* </p>
|
||||
*/
|
||||
private SleepTimeController sleepTimeController = new KinesisSleepTimeController();
|
||||
|
||||
/**
|
||||
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
|
||||
*/
|
||||
|
|
@ -185,7 +194,8 @@ public class PollingConfig implements RetrievalSpecificConfig {
|
|||
recordsFetcherFactory,
|
||||
maxRecords(),
|
||||
kinesisRequestTimeout,
|
||||
dataFetcherProvider);
|
||||
dataFetcherProvider,
|
||||
sleepTimeController);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
|
@ -92,6 +91,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
private final MetricsFactory metricsFactory;
|
||||
private final long idleMillisBetweenCalls;
|
||||
private Instant lastSuccessfulCall;
|
||||
private Integer lastGetRecordsReturnedRecordsCount;
|
||||
private Long lastMillisBehindLatest = null;
|
||||
private boolean isFirstGetCallTry = true;
|
||||
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
|
||||
private boolean started = false;
|
||||
|
|
@ -110,6 +111,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
private Instant lastEventDeliveryTime = Instant.EPOCH;
|
||||
private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
|
||||
private final ThrottlingReporter throttlingReporter;
|
||||
private final SleepTimeController sleepTimeController;
|
||||
|
||||
@Data
|
||||
@Accessors(fluent = true)
|
||||
|
|
@ -235,7 +237,8 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
@NonNull final String operation,
|
||||
@NonNull final String shardId,
|
||||
final ThrottlingReporter throttlingReporter,
|
||||
final long awaitTerminationTimeoutMillis) {
|
||||
final long awaitTerminationTimeoutMillis,
|
||||
final SleepTimeController sleepTimeController) {
|
||||
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
|
||||
this.maxRecordsPerCall = maxRecordsPerCall;
|
||||
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
|
||||
|
|
@ -255,6 +258,51 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
|
||||
this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
|
||||
this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis;
|
||||
this.sleepTimeController = sleepTimeController;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
|
||||
* LinkedBlockingQueue.
|
||||
*
|
||||
* @see PrefetchRecordsPublisher
|
||||
*
|
||||
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
|
||||
* blocking
|
||||
* @param maxByteSize Max byte size of the queue before blocking next get records call
|
||||
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
|
||||
* @param maxRecordsPerCall Max records to be returned per call
|
||||
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
|
||||
* @param executorService Executor service for the cache
|
||||
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
|
||||
*/
|
||||
public PrefetchRecordsPublisher(
|
||||
final int maxPendingProcessRecordsInput,
|
||||
final int maxByteSize,
|
||||
final int maxRecordsCount,
|
||||
final int maxRecordsPerCall,
|
||||
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||
final ExecutorService executorService,
|
||||
final long idleMillisBetweenCalls,
|
||||
final MetricsFactory metricsFactory,
|
||||
final String operation,
|
||||
final String shardId,
|
||||
final ThrottlingReporter throttlingReporter,
|
||||
final SleepTimeController sleepTimeController) {
|
||||
this(
|
||||
maxPendingProcessRecordsInput,
|
||||
maxByteSize,
|
||||
maxRecordsCount,
|
||||
maxRecordsPerCall,
|
||||
getRecordsRetrievalStrategy,
|
||||
executorService,
|
||||
idleMillisBetweenCalls,
|
||||
metricsFactory,
|
||||
operation,
|
||||
shardId,
|
||||
throttlingReporter,
|
||||
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
|
||||
sleepTimeController);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -296,7 +344,53 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
operation,
|
||||
shardId,
|
||||
throttlingReporter,
|
||||
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS);
|
||||
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS,
|
||||
new KinesisSleepTimeController());
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
|
||||
* LinkedBlockingQueue.
|
||||
*
|
||||
* @see PrefetchRecordsPublisher
|
||||
*
|
||||
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
|
||||
* blocking
|
||||
* @param maxByteSize Max byte size of the queue before blocking next get records call
|
||||
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
|
||||
* @param maxRecordsPerCall Max records to be returned per call
|
||||
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
|
||||
* @param executorService Executor service for the cache
|
||||
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
|
||||
* @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService
|
||||
*/
|
||||
public PrefetchRecordsPublisher(
|
||||
final int maxPendingProcessRecordsInput,
|
||||
final int maxByteSize,
|
||||
final int maxRecordsCount,
|
||||
final int maxRecordsPerCall,
|
||||
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||
@NonNull final ExecutorService executorService,
|
||||
final long idleMillisBetweenCalls,
|
||||
@NonNull final MetricsFactory metricsFactory,
|
||||
@NonNull final String operation,
|
||||
@NonNull final String shardId,
|
||||
final ThrottlingReporter throttlingReporter,
|
||||
final long awaitTerminationTimeoutMillis) {
|
||||
this(
|
||||
maxPendingProcessRecordsInput,
|
||||
maxByteSize,
|
||||
maxRecordsCount,
|
||||
maxRecordsPerCall,
|
||||
getRecordsRetrievalStrategy,
|
||||
executorService,
|
||||
idleMillisBetweenCalls,
|
||||
metricsFactory,
|
||||
operation,
|
||||
shardId,
|
||||
throttlingReporter,
|
||||
awaitTerminationTimeoutMillis,
|
||||
new KinesisSleepTimeController());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -536,6 +630,9 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
GetRecordsResponseAdapter getRecordsResult =
|
||||
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
|
||||
lastSuccessfulCall = Instant.now();
|
||||
lastMillisBehindLatest = getRecordsResult.millisBehindLatest();
|
||||
lastGetRecordsReturnedRecordsCount =
|
||||
getRecordsResult.records().size();
|
||||
|
||||
final List<KinesisClientRecord> records = getRecordsResult.records();
|
||||
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
|
||||
|
|
@ -626,19 +723,22 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
|
|||
isFirstGetCallTry = false;
|
||||
return;
|
||||
}
|
||||
// Add a sleep if lastSuccessfulCall is still null but this is not the first try to avoid retry storm
|
||||
if (lastSuccessfulCall == null) {
|
||||
Thread.sleep(idleMillisBetweenCalls);
|
||||
return;
|
||||
}
|
||||
long timeSinceLastCall =
|
||||
Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
|
||||
if (timeSinceLastCall < idleMillisBetweenCalls) {
|
||||
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
|
||||
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(lastGetRecordsReturnedRecordsCount)
|
||||
.lastMillisBehindLatest(lastMillisBehindLatest)
|
||||
.build();
|
||||
long sleepTimeMillis = sleepTimeController.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
if (sleepTimeMillis > 0) {
|
||||
Thread.sleep(sleepTimeMillis);
|
||||
}
|
||||
|
||||
// avoid immediate-retry storms
|
||||
lastSuccessfulCall = null;
|
||||
lastGetRecordsReturnedRecordsCount = null;
|
||||
lastMillisBehindLatest = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
|||
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
|
||||
String shardId,
|
||||
MetricsFactory metricsFactory,
|
||||
int maxRecords) {
|
||||
int maxRecords,
|
||||
SleepTimeController sleepTimeController) {
|
||||
|
||||
return new PrefetchRecordsPublisher(
|
||||
maxPendingProcessRecordsInput,
|
||||
|
|
@ -59,7 +60,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
|
|||
metricsFactory,
|
||||
"ProcessTask",
|
||||
shardId,
|
||||
new ThrottlingReporter(maxConsecutiveThrottles, shardId));
|
||||
new ThrottlingReporter(maxConsecutiveThrottles, shardId),
|
||||
sleepTimeController);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
|
||||
@KinesisClientInternalApi
|
||||
public interface SleepTimeController {
|
||||
|
||||
/**
|
||||
* Calculates the sleep time in milliseconds before the next GetRecords call.
|
||||
*
|
||||
* @param sleepTimeControllerConfig contains the last successful call time and the idle time between calls.
|
||||
* @return the sleep time in milliseconds.
|
||||
*/
|
||||
long getSleepTimeMillis(SleepTimeControllerConfig sleepTimeControllerConfig);
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.experimental.Accessors;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
|
||||
/**
|
||||
* Configuration for the {@link SleepTimeController}.
|
||||
*/
|
||||
@KinesisClientInternalApi
|
||||
@EqualsAndHashCode
|
||||
@Data
|
||||
@Accessors(fluent = true)
|
||||
@Builder
|
||||
public class SleepTimeControllerConfig {
|
||||
private Instant lastSuccessfulCall;
|
||||
private long idleMillisBetweenCalls;
|
||||
private Integer lastGetRecordsReturnedRecordsCount;
|
||||
private Long lastMillisBehindLatest;
|
||||
}
|
||||
|
|
@ -53,6 +53,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
private final Duration kinesisRequestTimeout;
|
||||
|
||||
private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
|
||||
private final SleepTimeController sleepTimeController;
|
||||
|
||||
public SynchronousBlockingRetrievalFactory(
|
||||
String streamName,
|
||||
|
|
@ -60,7 +61,8 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
int maxRecords,
|
||||
Duration kinesisRequestTimeout,
|
||||
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider) {
|
||||
Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider,
|
||||
SleepTimeController sleepTimeController) {
|
||||
this.streamName = streamName;
|
||||
this.kinesisClient = kinesisClient;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
|
|
@ -68,6 +70,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
this.kinesisRequestTimeout = kinesisRequestTimeout;
|
||||
this.dataFetcherProvider =
|
||||
dataFetcherProvider == null ? defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
|
||||
this.sleepTimeController = sleepTimeController;
|
||||
}
|
||||
|
||||
private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
|
||||
|
|
@ -96,6 +99,7 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
|
|||
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
|
||||
shardInfo.shardId(),
|
||||
metricsFactory,
|
||||
maxRecords);
|
||||
maxRecords,
|
||||
getSleepTimeController());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright 2019 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.retrieval.polling;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class KinesisSleepTimeControllerTest {
|
||||
|
||||
private KinesisSleepTimeController controller;
|
||||
private final long idleMillisBetweenCalls = 1000L;
|
||||
private final Integer recordCount = 10;
|
||||
private final Long millisBehindLatest = 5000L;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
controller = new KinesisSleepTimeController();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisWithNullLastSuccessfulCall() {
|
||||
// When lastSuccessfulCall is null, it should return the idleMillisBetweenCalls
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(null)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
assertEquals(
|
||||
"When lastSuccessfulCall is null, should return idleMillisBetweenCalls",
|
||||
idleMillisBetweenCalls,
|
||||
sleepTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisWhenTimeSinceLastCallLessThanIdleTime() {
|
||||
// Create a lastSuccessfulCall time that's recent (less than idleMillisBetweenCalls ago)
|
||||
Instant now = Instant.now();
|
||||
Instant lastSuccessfulCall = now.minusMillis(500); // 500ms ago
|
||||
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
// Should return the remaining time to wait (idleMillisBetweenCalls - timeSinceLastCall)
|
||||
assertTrue(
|
||||
"Sleep time should be positive when time since last call is less than idle time",
|
||||
sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
|
||||
|
||||
// The exact value will vary slightly due to execution time, but should be close to 500ms
|
||||
long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
|
||||
assertTrue(
|
||||
"Sleep time should be approximately " + expectedApproxSleepTime + "ms",
|
||||
Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisWhenTimeSinceLastCallEqualsOrExceedsIdleTime() {
|
||||
// Create a lastSuccessfulCall time that's exactly idleMillisBetweenCalls ago
|
||||
Instant now = Instant.now();
|
||||
Instant lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls);
|
||||
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
// Should return 0 as we've waited the full idle time
|
||||
assertEquals("Sleep time should be 0 when time since last call equals idle time", 0L, sleepTime);
|
||||
|
||||
// Test with time exceeding idle time
|
||||
lastSuccessfulCall = now.minusMillis(idleMillisBetweenCalls + 500);
|
||||
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
assertEquals("Sleep time should be 0 when time since last call exceeds idle time", 0L, sleepTime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisWithFutureLastSuccessfulCall() {
|
||||
// Test with a lastSuccessfulCall in the future (should handle this case gracefully)
|
||||
Instant now = Instant.now();
|
||||
Instant futureCall = now.plusMillis(500); // 500ms in the future
|
||||
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(futureCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
// The implementation uses Duration.abs(), so it should treat this the same as 500ms in the past
|
||||
assertTrue(
|
||||
"Sleep time should be positive when last call time is in the future",
|
||||
sleepTime > 0 && sleepTime <= idleMillisBetweenCalls);
|
||||
|
||||
long expectedApproxSleepTime = idleMillisBetweenCalls - 500;
|
||||
assertTrue(
|
||||
"Sleep time should be approximately " + expectedApproxSleepTime + "ms",
|
||||
Math.abs(sleepTime - expectedApproxSleepTime) < 100); // Allow for small timing variations
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisWithDifferentIdleTimes() {
|
||||
// Test with different idle times
|
||||
Instant now = Instant.now();
|
||||
Instant lastSuccessfulCall = now.minusMillis(300);
|
||||
|
||||
// Test with shorter idle time
|
||||
long shorterIdleTime = 500L;
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(shorterIdleTime)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
long sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
assertEquals(
|
||||
"Sleep time should be approximately 200ms with 500ms idle time and 300ms elapsed",
|
||||
200L,
|
||||
sleepTime,
|
||||
100);
|
||||
|
||||
// Test with longer idle time
|
||||
long longerIdleTime = 2000L;
|
||||
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(longerIdleTime)
|
||||
.lastGetRecordsReturnedRecordsCount(recordCount)
|
||||
.lastMillisBehindLatest(millisBehindLatest)
|
||||
.build();
|
||||
sleepTime = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
assertEquals(
|
||||
"Sleep time should be approximately 1700ms with 2000ms idle time and 300ms elapsed",
|
||||
1700L,
|
||||
sleepTime,
|
||||
100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSleepTimeMillisIgnoresRecordCountAndMillisBehindLatest() {
|
||||
// Verify that the implementation ignores lastGetRecordsReturnedRecordsCount and millisBehindLatest parameters
|
||||
Instant now = Instant.now();
|
||||
Instant lastSuccessfulCall = now.minusMillis(500);
|
||||
|
||||
SleepTimeControllerConfig sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(0)
|
||||
.lastMillisBehindLatest(0L)
|
||||
.build();
|
||||
long sleepTime1 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
sleepTimeControllerConfig = SleepTimeControllerConfig.builder()
|
||||
.lastSuccessfulCall(lastSuccessfulCall)
|
||||
.idleMillisBetweenCalls(idleMillisBetweenCalls)
|
||||
.lastGetRecordsReturnedRecordsCount(100)
|
||||
.lastMillisBehindLatest(10000L)
|
||||
.build();
|
||||
long sleepTime2 = controller.getSleepTimeMillis(sleepTimeControllerConfig);
|
||||
|
||||
// Both calls should return approximately the same sleep time since these parameters are ignored
|
||||
assertTrue(
|
||||
"Sleep time should be the same regardless of record count and millisBehindLatest",
|
||||
Math.abs(sleepTime1 - sleepTime2) < 10); // Allow for minimal timing variations
|
||||
}
|
||||
}
|
||||
|
|
@ -55,16 +55,16 @@ public class RecordsFetcherFactoryTest {
|
|||
@Ignore
|
||||
// TODO: remove test no longer holds true
|
||||
public void createDefaultRecordsFetcherTest() {
|
||||
RecordsPublisher recordsCache =
|
||||
recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
|
||||
RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
|
||||
getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
|
||||
assertThat(recordsCache, instanceOf(BlockingRecordsPublisher.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createPrefetchRecordsFetcherTest() {
|
||||
recordsFetcherFactory.dataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
|
||||
RecordsPublisher recordsCache =
|
||||
recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, metricsFactory, 1);
|
||||
RecordsPublisher recordsCache = recordsFetcherFactory.createRecordsFetcher(
|
||||
getRecordsRetrievalStrategy, shardId, metricsFactory, 1, new KinesisSleepTimeController());
|
||||
assertThat(recordsCache, instanceOf(PrefetchRecordsPublisher.class));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue