Adding idleMillisBetweenCalls config, to slow down the aggressive get calls while prefetching.

This commit is contained in:
Sahil Palvia 2017-09-26 14:39:13 -07:00
parent b5d5618a94
commit 34dd6f416a
9 changed files with 341 additions and 138 deletions

View file

@ -182,6 +182,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
/**
* The amount of time to sleep in between 2 get calls from the data fetcher.
*/
public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -235,6 +240,9 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private RecordsFetcherFactory recordsFetcherFactory; private RecordsFetcherFactory recordsFetcherFactory;
@Getter
private long idleMillisBetweenCalls;
/** /**
* Constructor. * Constructor.
* *
@ -270,15 +278,32 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) { String workerId) {
this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, this(applicationName,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, streamName,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, null,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, null,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, DEFAULT_INITIAL_POSITION_IN_STREAM,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), kinesisCredentialsProvider,
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, dynamoDBCredentialsProvider,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, cloudWatchCredentialsProvider,
DEFAULT_SHUTDOWN_GRACE_MILLIS); DEFAULT_FAILOVER_TIME_MILLIS,
workerId,
DEFAULT_MAX_RECORDS,
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS,
DEFAULT_METRICS_BUFFER_TIME_MILLIS,
DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
DEFAULT_SHUTDOWN_GRACE_MILLIS,
DEFAULT_IDLE_MILLIS_BETWEEN_CALLS);
} }
/** /**
@ -318,29 +343,30 @@ public class KinesisClientLibConfiguration {
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName, public KinesisClientLibConfiguration(String applicationName,
String streamName, String streamName,
String kinesisEndpoint, String kinesisEndpoint,
InitialPositionInStream initialPositionInStream, InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider, AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis, long failoverTimeMillis,
String workerId, String workerId,
int maxRecords, int maxRecords,
long idleTimeBetweenReadsInMillis, long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList, boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
long shardSyncIntervalMillis, long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry, boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig, ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig, ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig, ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis, long taskBackoffTimeMillis,
long metricsBufferTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis,
long idleMillisBetweenCalls) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis, maxRecords, idleTimeBetweenReadsInMillis,
@ -348,7 +374,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls);
} }
/** /**
@ -388,30 +414,31 @@ public class KinesisClientLibConfiguration {
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName, public KinesisClientLibConfiguration(String applicationName,
String streamName, String streamName,
String kinesisEndpoint, String kinesisEndpoint,
String dynamoDBEndpoint, String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream, InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider, AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis, long failoverTimeMillis,
String workerId, String workerId,
int maxRecords, int maxRecords,
long idleTimeBetweenReadsInMillis, long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList, boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
long shardSyncIntervalMillis, long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry, boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig, ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig, ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig, ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis, long taskBackoffTimeMillis,
long metricsBufferTimeMillis, long metricsBufferTimeMillis,
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis,
long idleMillisBetweenCalls) {
// Check following values are greater than zero // Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -422,6 +449,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
checkIsRegionNameValid(regionName); checkIsRegionNameValid(regionName);
this.applicationName = applicationName; this.applicationName = applicationName;
this.tableName = applicationName; this.tableName = applicationName;
@ -459,6 +487,7 @@ public class KinesisClientLibConfiguration {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
} }
/** /**
@ -1322,4 +1351,14 @@ public class KinesisClientLibConfiguration {
this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis;
return this; return this;
} }
/**
* @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher.
* @return
*/
public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) {
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
return this;
}
} }

View file

@ -14,6 +14,8 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -40,16 +42,18 @@ class KinesisDataFetcher {
private final String shardId; private final String shardId;
private boolean isShardEndReached; private boolean isShardEndReached;
private boolean isInitialized; private boolean isInitialized;
private Instant lastResponseTime;
private long idleMillisBetweenCalls;
/** /**
* *
* @param kinesisProxy Kinesis proxy * @param kinesisProxy Kinesis proxy
* @param shardInfo The shardInfo object. * @param shardInfo The shardInfo object.
*/ */
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) {
this.shardId = shardInfo.getShardId(); this.shardId = shardInfo.getShardId();
this.kinesisProxy = this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId);
new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls();
} }
/** /**
@ -66,7 +70,9 @@ class KinesisDataFetcher {
GetRecordsResult response = null; GetRecordsResult response = null;
if (nextIterator != null) { if (nextIterator != null) {
try { try {
sleepBeforeNextCall();
response = kinesisProxy.get(nextIterator, maxRecords); response = kinesisProxy.get(nextIterator, maxRecords);
lastResponseTime = Instant.now();
nextIterator = response.getNextShardIterator(); nextIterator = response.getNextShardIterator();
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
@ -183,6 +189,19 @@ class KinesisDataFetcher {
return iterator; return iterator;
} }
private void sleepBeforeNextCall() {
if (lastResponseTime != null) {
long timeDiff = Duration.between(lastResponseTime, Instant.now()).abs().toMillis();
if (timeDiff < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeDiff);
} catch (InterruptedException e) {
LOG.info("Thread interrupted, shutdown possibly called.");
}
}
}
}
/** /**
* @return the shardEndReached * @return the shardEndReached
*/ */

View file

@ -123,7 +123,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
getRecordsResultQueue.put(processRecordsInput); getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput); prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache", e); log.info("Thread was interrupted, indicating shutdown was called on the cache");
} }
} }
} }

View file

@ -112,9 +112,20 @@ class ShardConsumer {
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config) { KinesisClientLibConfiguration config) {
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, this(shardInfo,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, streamConfig,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), config); checkpoint,
recordProcessor,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
config);
} }
/** /**
@ -166,7 +177,7 @@ class ShardConsumer {
metricsFactory, metricsFactory,
backoffTimeMillis, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config),
retryGetRecordsInSeconds, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, maxGetRecordsThreadPool,
config config

View file

@ -64,6 +64,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock @Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier; private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
@Mock
private KinesisClientLibConfiguration configuration;
private CompletionService<GetRecordsResult> completionService; private CompletionService<GetRecordsResult> completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy; private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
@ -88,8 +91,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
rejectedExecutionHandler)); rejectedExecutionHandler));
completionService = spy(new ExecutorCompletionService<GetRecordsResult>(executorService)); completionService = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService); when(completionServiceSupplier.get()).thenReturn(completionService);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001"); getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(
dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null; result = null;
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
} }
@Test @Test
@ -149,7 +154,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private class KinesisDataFetcherForTests extends KinesisDataFetcher { private class KinesisDataFetcherForTests extends KinesisDataFetcher {
public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) { public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) {
super(kinesisProxy, shardInfo); super(kinesisProxy, shardInfo, configuration);
} }
@Override @Override

View file

@ -85,6 +85,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG,
TEST_VALUE_LONG); TEST_VALUE_LONG);
} }
@ -95,7 +96,8 @@ public class KinesisClientLibConfigurationTest {
// Try each argument at one time. // Try each argument at one time.
KinesisClientLibConfiguration config = null; KinesisClientLibConfiguration config = null;
long[] longValues = long[] longValues =
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_LONG, TEST_VALUE_LONG };
for (int i = 0; i < PARAMETER_COUNT; i++) { for (int i = 0; i < PARAMETER_COUNT; i++) {
longValues[i] = INVALID_LONG; longValues[i] = INVALID_LONG;
try { try {
@ -124,7 +126,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
longValues[6]); longValues[6],
longValues[7]);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
@ -159,6 +162,7 @@ public class KinesisClientLibConfigurationTest {
intValues[1], intValues[1],
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG,
TEST_VALUE_LONG); TEST_VALUE_LONG);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
@ -300,30 +304,31 @@ public class KinesisClientLibConfigurationTest {
Mockito.mock(AWSCredentialsProvider.class); Mockito.mock(AWSCredentialsProvider.class);
try { try {
new KinesisClientLibConfiguration(TEST_STRING, new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
null, null,
null, null,
null, null,
null, null,
TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_STRING, TEST_STRING,
3, 3,
TEST_VALUE_LONG, TEST_VALUE_LONG,
false, false,
TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_LONG, TEST_VALUE_LONG,
true, true,
new ClientConfiguration(), new ClientConfiguration(),
new ClientConfiguration(), new ClientConfiguration(),
new ClientConfiguration(), new ClientConfiguration(),
TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_LONG, TEST_VALUE_LONG,
1, 1,
skipCheckpointValidationValue, skipCheckpointValidationValue,
"abcd", "abcd",
TEST_VALUE_LONG); TEST_VALUE_LONG,
TEST_VALUE_LONG);
Assert.fail("No expected Exception is thrown."); Assert.fail("No expected Exception is thrown.");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());

View file

@ -39,10 +39,14 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/** /**
* Unit tests for KinesisDataFetcher. * Unit tests for KinesisDataFetcher.
*/ */
@RunWith(MockitoJUnitRunner.class)
public class KinesisDataFetcherTest { public class KinesisDataFetcherTest {
private static final int MAX_RECORDS = 1; private static final int MAX_RECORDS = 1;
@ -56,6 +60,9 @@ public class KinesisDataFetcherTest {
private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP =
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000));
@Mock
private KinesisClientLibConfiguration configuration;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@ -115,8 +122,9 @@ public class KinesisDataFetcherTest {
public void testadvanceIteratorTo() throws KinesisClientLibException { public void testadvanceIteratorTo() throws KinesisClientLibException {
IKinesisProxy kinesis = mock(IKinesisProxy.class); IKinesisProxy kinesis = mock(IKinesisProxy.class);
ICheckpoint checkpoint = mock(ICheckpoint.class); ICheckpoint checkpoint = mock(ICheckpoint.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
String iteratorA = "foo"; String iteratorA = "foo";
@ -148,8 +156,9 @@ public class KinesisDataFetcherTest {
@Test @Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
IKinesisProxy kinesis = mock(IKinesisProxy.class); IKinesisProxy kinesis = mock(IKinesisProxy.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
String iteratorHorizon = "horizon"; String iteratorHorizon = "horizon";
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon); when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon);
@ -178,9 +187,10 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class); KinesisProxy mockProxy = mock(KinesisProxy.class);
doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString()); doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString());
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
// Create data fectcher and initialize it with latest type checkpoint // Create data fectcher and initialize it with latest type checkpoint
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
// Call getRecords of dataFetcher which will throw an exception // Call getRecords of dataFetcher which will throw an exception
@ -197,8 +207,9 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class); KinesisProxy mockProxy = mock(KinesisProxy.class);
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords);
@ -223,8 +234,9 @@ public class KinesisDataFetcherTest {
ICheckpoint checkpoint = mock(ICheckpoint.class); ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
fetcher.initialize(seqNo, initialPositionInStream); fetcher.initialize(seqNo, initialPositionInStream);
List<Record> actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords(); List<Record> actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords();

View file

@ -122,6 +122,7 @@ public class ShardConsumerTest {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(0l);
} }
/** /**
@ -339,7 +340,7 @@ public class ShardConsumerTest {
) )
); );
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
@ -467,7 +468,7 @@ public class ShardConsumerTest {
) )
); );
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);

View file

@ -164,6 +164,7 @@ public class WorkerTest {
public void setup() { public void setup() {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(500L);
} }
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
@ -292,10 +293,22 @@ public class WorkerTest {
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint)
.thenReturn(secondCheckpoint); .thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, Worker worker = new Worker(stageName,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, streamletFactory,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, config,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); streamConfig,
INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
checkpoint,
leaseCoordinator,
execService,
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
Worker workerSpy = spy(worker); Worker workerSpy = spy(worker);
@ -768,10 +781,22 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, config,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -918,10 +943,22 @@ public class WorkerTest {
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, Worker worker = new InjectableWorker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, config,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization) {
@Override @Override
void postConstruct() { void postConstruct() {
this.gracefulShutdownCoordinator = coordinator; this.gracefulShutdownCoordinator = coordinator;
@ -982,10 +1019,22 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1054,10 +1103,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1157,10 +1218,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1264,10 +1337,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1338,10 +1423,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1383,10 +1480,22 @@ public class WorkerTest {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, super(applicationName,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, recordProcessorFactory,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, config,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); streamConfig,
initialPositionInStream,
parentShardPollIntervalMillis,
shardSyncIdleTimeMillis,
cleanupLeasesUponShardCompletion,
checkpoint,
leaseCoordinator,
execService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization);
postConstruct(); postConstruct();
} }
@ -1690,8 +1799,10 @@ public class WorkerTest {
idleTimeInMilliseconds, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig = KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null));
new KinesisClientLibConfiguration("app", null, null, null);
when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L);
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
recordProcessorFactory, recordProcessorFactory,