diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 41bab25b..ca7884d9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -182,6 +182,11 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + /** + * The amount of time to sleep in between 2 get calls from the data fetcher. + */ + public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L; + private String applicationName; private String tableName; private String streamName; @@ -234,6 +239,9 @@ public class KinesisClientLibConfiguration { @Getter private RecordsFetcherFactory recordsFetcherFactory; + + @Getter + private long idleMillisBetweenCalls; /** * Constructor. @@ -270,15 +278,32 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, - dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, - DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, - DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), - DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, - DEFAULT_SHUTDOWN_GRACE_MILLIS); + this(applicationName, + streamName, + null, + null, + DEFAULT_INITIAL_POSITION_IN_STREAM, + kinesisCredentialsProvider, + dynamoDBCredentialsProvider, + cloudWatchCredentialsProvider, + DEFAULT_FAILOVER_TIME_MILLIS, + workerId, + DEFAULT_MAX_RECORDS, + DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, + DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, + DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + DEFAULT_TASK_BACKOFF_TIME_MILLIS, + DEFAULT_METRICS_BUFFER_TIME_MILLIS, + DEFAULT_METRICS_MAX_QUEUE_SIZE, + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + null, + DEFAULT_SHUTDOWN_GRACE_MILLIS, + DEFAULT_IDLE_MILLIS_BETWEEN_CALLS); } /** @@ -318,29 +343,30 @@ public class KinesisClientLibConfiguration { // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES public KinesisClientLibConfiguration(String applicationName, - String streamName, - String kinesisEndpoint, - InitialPositionInStream initialPositionInStream, - AWSCredentialsProvider kinesisCredentialsProvider, - AWSCredentialsProvider dynamoDBCredentialsProvider, - AWSCredentialsProvider cloudWatchCredentialsProvider, - long failoverTimeMillis, - String workerId, - int maxRecords, - long idleTimeBetweenReadsInMillis, - boolean callProcessRecordsEvenForEmptyRecordList, - long parentShardPollIntervalMillis, - long shardSyncIntervalMillis, - boolean cleanupTerminatedShardsBeforeExpiry, - ClientConfiguration kinesisClientConfig, - ClientConfiguration dynamoDBClientConfig, - ClientConfiguration cloudWatchClientConfig, - long taskBackoffTimeMillis, - long metricsBufferTimeMillis, - int metricsMaxQueueSize, - boolean validateSequenceNumberBeforeCheckpointing, - String regionName, - long shutdownGraceMillis) { + String streamName, + String kinesisEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + long idleMillisBetweenCalls) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -348,7 +374,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls); } /** @@ -388,30 +414,31 @@ public class KinesisClientLibConfiguration { // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES public KinesisClientLibConfiguration(String applicationName, - String streamName, - String kinesisEndpoint, - String dynamoDBEndpoint, - InitialPositionInStream initialPositionInStream, - AWSCredentialsProvider kinesisCredentialsProvider, - AWSCredentialsProvider dynamoDBCredentialsProvider, - AWSCredentialsProvider cloudWatchCredentialsProvider, - long failoverTimeMillis, - String workerId, - int maxRecords, - long idleTimeBetweenReadsInMillis, - boolean callProcessRecordsEvenForEmptyRecordList, - long parentShardPollIntervalMillis, - long shardSyncIntervalMillis, - boolean cleanupTerminatedShardsBeforeExpiry, - ClientConfiguration kinesisClientConfig, - ClientConfiguration dynamoDBClientConfig, - ClientConfiguration cloudWatchClientConfig, - long taskBackoffTimeMillis, - long metricsBufferTimeMillis, - int metricsMaxQueueSize, - boolean validateSequenceNumberBeforeCheckpointing, - String regionName, - long shutdownGraceMillis) { + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + long idleMillisBetweenCalls) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -422,6 +449,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -459,6 +487,7 @@ public class KinesisClientLibConfiguration { this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + this.idleMillisBetweenCalls = idleMillisBetweenCalls; } /** @@ -1322,4 +1351,14 @@ public class KinesisClientLibConfiguration { this.shutdownGraceMillis = shutdownGraceMillis; return this; } + + /** + * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. + * @return + */ + public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { + checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 5e84b1cf..efec0e3f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,6 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.Date; @@ -40,16 +42,18 @@ class KinesisDataFetcher { private final String shardId; private boolean isShardEndReached; private boolean isInitialized; + private Instant lastResponseTime; + private long idleMillisBetweenCalls; /** * * @param kinesisProxy Kinesis proxy * @param shardInfo The shardInfo object. */ - public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { + public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) { this.shardId = shardInfo.getShardId(); - this.kinesisProxy = - new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); + this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); + this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls(); } /** @@ -66,7 +70,9 @@ class KinesisDataFetcher { GetRecordsResult response = null; if (nextIterator != null) { try { + sleepBeforeNextCall(); response = kinesisProxy.get(nextIterator, maxRecords); + lastResponseTime = Instant.now(); nextIterator = response.getNextShardIterator(); } catch (ResourceNotFoundException e) { LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); @@ -182,6 +188,19 @@ class KinesisDataFetcher { } return iterator; } + + private void sleepBeforeNextCall() { + if (lastResponseTime != null) { + long timeDiff = Duration.between(lastResponseTime, Instant.now()).abs().toMillis(); + if (timeDiff < idleMillisBetweenCalls) { + try { + Thread.sleep(idleMillisBetweenCalls - timeDiff); + } catch (InterruptedException e) { + LOG.info("Thread interrupted, shutdown possibly called."); + } + } + } + } /** * @return the shardEndReached diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 8315fb73..c849f7f2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -123,7 +123,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache", e); + log.info("Thread was interrupted, indicating shutdown was called on the cache"); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 75b5f474..de49dada 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -112,9 +112,20 @@ class ShardConsumer { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, KinesisClientLibConfiguration config) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, - parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, - backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), config); + this(shardInfo, + streamConfig, + checkpoint, + recordProcessor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional.empty(), + Optional.empty(), + config); } /** @@ -166,7 +177,7 @@ class ShardConsumer { metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index e8380805..bf916ae5 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -63,6 +63,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { private ShardInfo mockShardInfo; @Mock private Supplier> completionServiceSupplier; + + @Mock + private KinesisClientLibConfiguration configuration; private CompletionService completionService; @@ -88,8 +91,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { rejectedExecutionHandler)); completionService = spy(new ExecutorCompletionService(executorService)); when(completionServiceSupplier.get()).thenReturn(completionService); - getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001"); + getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy( + dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001"); result = null; + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); } @Test @@ -149,7 +154,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { private class KinesisDataFetcherForTests extends KinesisDataFetcher { public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) { - super(kinesisProxy, shardInfo); + super(kinesisProxy, shardInfo, configuration); } @Override diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index cfa8be10..1b67412f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -85,6 +85,7 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, + TEST_VALUE_LONG, TEST_VALUE_LONG); } @@ -95,7 +96,8 @@ public class KinesisClientLibConfigurationTest { // Try each argument at one time. KinesisClientLibConfiguration config = null; long[] longValues = - { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; + { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, + TEST_VALUE_LONG, TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -124,7 +126,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - longValues[6]); + longValues[6], + longValues[7]); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -159,6 +162,7 @@ public class KinesisClientLibConfigurationTest { intValues[1], skipCheckpointValidationValue, null, + TEST_VALUE_LONG, TEST_VALUE_LONG); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); @@ -300,30 +304,31 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, - TEST_STRING, - TEST_STRING, - TEST_STRING, - null, - null, - null, - null, - TEST_VALUE_LONG, - TEST_STRING, - 3, - TEST_VALUE_LONG, - false, - TEST_VALUE_LONG, - TEST_VALUE_LONG, - true, - new ClientConfiguration(), - new ClientConfiguration(), - new ClientConfiguration(), - TEST_VALUE_LONG, - TEST_VALUE_LONG, - 1, - skipCheckpointValidationValue, - "abcd", - TEST_VALUE_LONG); + TEST_STRING, + TEST_STRING, + TEST_STRING, + null, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + 3, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + 1, + skipCheckpointValidationValue, + "abcd", + TEST_VALUE_LONG, + TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { System.out.println(e.getMessage()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 7e8937a6..d74a0b16 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -39,10 +39,14 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; /** * Unit tests for KinesisDataFetcher. */ +@RunWith(MockitoJUnitRunner.class) public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; @@ -55,6 +59,9 @@ public class KinesisDataFetcherTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); + + @Mock + private KinesisClientLibConfiguration configuration; /** * @throws java.lang.Exception @@ -115,8 +122,9 @@ public class KinesisDataFetcherTest { public void testadvanceIteratorTo() throws KinesisClientLibException { IKinesisProxy kinesis = mock(IKinesisProxy.class); ICheckpoint checkpoint = mock(ICheckpoint.class); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); String iteratorA = "foo"; @@ -148,8 +156,9 @@ public class KinesisDataFetcherTest { @Test public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { IKinesisProxy kinesis = mock(IKinesisProxy.class); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); String iteratorHorizon = "horizon"; when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon); @@ -178,9 +187,10 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString()); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); // Create data fectcher and initialize it with latest type checkpoint - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher); // Call getRecords of dataFetcher which will throw an exception @@ -197,8 +207,9 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); @@ -223,8 +234,9 @@ public class KinesisDataFetcherTest { ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); fetcher.initialize(seqNo, initialPositionInStream); List actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 4414b96a..33d613de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -122,6 +122,7 @@ public class ShardConsumerTest { recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(config.getIdleMillisBetweenCalls()).thenReturn(0l); } /** @@ -339,7 +340,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); @@ -467,7 +468,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 107900dc..bf30c510 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -164,6 +164,7 @@ public class WorkerTest { public void setup() { recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(config.getIdleMillisBetweenCalls()).thenReturn(500L); } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES @@ -292,10 +293,22 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, - parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, - leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); + Worker worker = new Worker(stageName, + streamletFactory, + config, + streamConfig, + INITIAL_POSITION_LATEST, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + nullMetricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + shardPrioritization); Worker workerSpy = spy(worker); @@ -768,10 +781,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -918,10 +943,22 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + Worker worker = new InjectableWorker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization) { @Override void postConstruct() { this.gracefulShutdownCoordinator = coordinator; @@ -982,10 +1019,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1054,10 +1103,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1157,10 +1218,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1264,10 +1337,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1338,10 +1423,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1383,10 +1480,22 @@ public class WorkerTest { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, - parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, - checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + super(applicationName, + recordProcessorFactory, + config, + streamConfig, + initialPositionInStream, + parentShardPollIntervalMillis, + shardSyncIdleTimeMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization); postConstruct(); } @@ -1690,8 +1799,10 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); + KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null)); + + when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L); + Worker worker = new Worker(stageName, recordProcessorFactory,