diff --git a/pom.xml b/pom.xml index 895c6542..0e40b5ab 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.5 + 1.8.6-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java new file mode 100644 index 00000000..d9fc011e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.time.Duration; +import java.time.Instant; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.extern.apachecommons.CommonsLog; + +/** + * This is the BlockingGetRecordsCache class. This class blocks any calls to the getRecords on the + * GetRecordsRetrievalStrategy class. + */ +@CommonsLog +public class BlockingGetRecordsCache implements GetRecordsCache { + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final long idleMillisBetweenCalls; + private Instant lastSuccessfulCall; + + public BlockingGetRecordsCache(final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final long idleMillisBetweenCalls) { + this.maxRecordsPerCall = maxRecordsPerCall; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + } + + @Override + public void start() { + // + // Nothing to do here + // + } + + @Override + public ProcessRecordsInput getNextResult() { + sleepBeforeNextCall(); + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + lastSuccessfulCall = Instant.now(); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + return processRecordsInput; + } + + private void sleepBeforeNextCall() { + if (!Thread.interrupted()) { + if (lastSuccessfulCall == null) { + return; + } + long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); + if (timeSinceLastCall < idleMillisBetweenCalls) { + try { + Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); + } catch (InterruptedException e) { + log.info("Thread was interrupted, indicating that shutdown was called."); + } + } + } else { + log.info("Thread has been interrupted, indicating that it is in the shutdown phase."); + } + } + + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + + @Override + public void shutdown() { + getRecordsRetrievalStrategy.shutdown(); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d3ccb911..9121df4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; - /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. @@ -253,9 +251,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); + return new InitializeTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.getStreamConfig(), + consumer.getGetRecordsCache()); } @Override @@ -309,10 +312,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getGetRecordsRetrievalStrategy()); + return new ProcessTask(consumer.getShardInfo(), + consumer.getStreamConfig(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getGetRecordsCache()); } @Override @@ -371,8 +378,10 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), - consumer.getShutdownNotification(), consumer.getShardInfo()); + return new ShutdownNotificationTask(consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification(), + consumer.getShardInfo()); } @Override @@ -511,13 +520,16 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), + return new ShutdownTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownReason(), consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), - consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), + consumer.isCleanupLeasesOfCompletedShards(), + consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsRetrievalStrategy()); + consumer.getGetRecordsCache()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java new file mode 100644 index 00000000..05c2ab3f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/DataFetchingStrategy.java @@ -0,0 +1,8 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * + */ +public enum DataFetchingStrategy { + DEFAULT, PREFETCH_CACHED; +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java new file mode 100644 index 00000000..dba24f8d --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; + +/** + * This class is used as a cache for Prefetching data from Kinesis. + */ +public interface GetRecordsCache { + /** + * This method calls the start behavior on the cache, if available. + */ + void start(); + + /** + * This method returns the next set of records from the Cache if present, or blocks the request till it gets the + * next set of records back from Kinesis. + * + * @return The next set of records. + */ + ProcessRecordsInput getNextResult(); + + GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy(); + + /** + * This method calls the shutdown behavior on the cache, if available. + */ + void shutdown(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java new file mode 100644 index 00000000..d5b4a782 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetriever.java @@ -0,0 +1,12 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import java.util.concurrent.Callable; + +/** + * This class uses the GetRecordsRetrievalStrategy class to retrieve the next set of records and update the cache. + */ +public interface GetRecordsRetriever { + GetRecordsResult getNextRecords(int maxRecords); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index e3d9f607..5e847a89 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -43,17 +43,19 @@ class InitializeTask implements ITask { // Back off for this interval if we encounter a problem (exception) private final long backoffTimeMillis; private final StreamConfig streamConfig; + private final GetRecordsCache getRecordsCache; /** * Constructor. */ InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig) { + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; @@ -61,6 +63,7 @@ class InitializeTask implements ITask { this.dataFetcher = dataFetcher; this.backoffTimeMillis = backoffTimeMillis; this.streamConfig = streamConfig; + this.getRecordsCache = getRecordsCache; } /* @@ -80,6 +83,7 @@ class InitializeTask implements ITask { ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); + getRecordsCache.start(); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 7d6dac5a..b0ac6bfd 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + @Getter + private RecordsFetcherFactory recordsFetcherFactory; + /** * Constructor. * @@ -267,14 +270,30 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, - dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, - DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, - DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), - DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, + this(applicationName, + streamName, + null, + null, + DEFAULT_INITIAL_POSITION_IN_STREAM, + kinesisCredentialsProvider, + dynamoDBCredentialsProvider, + cloudWatchCredentialsProvider, + DEFAULT_FAILOVER_TIME_MILLIS, + workerId, + DEFAULT_MAX_RECORDS, + DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, + DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, + DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + DEFAULT_TASK_BACKOFF_TIME_MILLIS, + DEFAULT_METRICS_BUFFER_TIME_MILLIS, + DEFAULT_METRICS_MAX_QUEUE_SIZE, + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + null, DEFAULT_SHUTDOWN_GRACE_MILLIS); } @@ -315,29 +334,29 @@ public class KinesisClientLibConfiguration { // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES public KinesisClientLibConfiguration(String applicationName, - String streamName, - String kinesisEndpoint, - InitialPositionInStream initialPositionInStream, - AWSCredentialsProvider kinesisCredentialsProvider, - AWSCredentialsProvider dynamoDBCredentialsProvider, - AWSCredentialsProvider cloudWatchCredentialsProvider, - long failoverTimeMillis, - String workerId, - int maxRecords, - long idleTimeBetweenReadsInMillis, - boolean callProcessRecordsEvenForEmptyRecordList, - long parentShardPollIntervalMillis, - long shardSyncIntervalMillis, - boolean cleanupTerminatedShardsBeforeExpiry, - ClientConfiguration kinesisClientConfig, - ClientConfiguration dynamoDBClientConfig, - ClientConfiguration cloudWatchClientConfig, - long taskBackoffTimeMillis, - long metricsBufferTimeMillis, - int metricsMaxQueueSize, - boolean validateSequenceNumberBeforeCheckpointing, - String regionName, - long shutdownGraceMillis) { + String streamName, + String kinesisEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -345,7 +364,117 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis) { + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + checkIsRegionNameValid(regionName); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamName; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); } /** @@ -408,7 +537,7 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis) { + RecordsFetcherFactory recordsFetcherFactory) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -418,7 +547,6 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); - checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -455,6 +583,7 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; this.shutdownGraceMillis = shutdownGraceMillis; } @@ -1158,6 +1287,48 @@ public class KinesisClientLibConfiguration { return this; } + /** + * + * @param maxPendingProcessRecordsInput The max number of ProcessRecordsInput that can be stored in the cache before + * blocking + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxPendingProcessRecordsInput(final int maxPendingProcessRecordsInput) { + checkIsValuePositive("maxPendingProcessRecordsInput", maxPendingProcessRecordsInput); + this.recordsFetcherFactory.setMaxPendingProcessRecordsInput(maxPendingProcessRecordsInput); + return this; + } + + /** + * @param maxCacheByteSize Max byte size for the cache at any given point of time. After this threshold is crossed + * the KinesisDataFetcher will be blocked until the cache has more space available. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { + checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); + this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + return this; + } + + /** + * @param dataFetchingStrategy The strategy for fetching data from kinesis. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { + this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); + return this; + } + + /** + * @param maxRecordsCount The maximum number of records in the cache, accross all ProcessRecordInput objects + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { + checkIsValuePositive("maxRecordsCount", maxRecordsCount); + this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ @@ -1174,4 +1345,14 @@ public class KinesisClientLibConfiguration { this.shutdownGraceMillis = shutdownGraceMillis; return this; } + + /** + * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { + checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); + this.recordsFetcherFactory.setIdleMillisBetweenCalls(idleMillisBetweenCalls); + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index dec0ac5e..c2ba9d15 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -1,33 +1,34 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import lombok.Data; +import java.util.Collections; +import java.util.Date; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; -import java.util.Date; -import java.util.function.Consumer; +import lombok.Data; /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. @@ -49,8 +50,7 @@ class KinesisDataFetcher { */ public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { this.shardId = shardInfo.getShardId(); - this.kinesisProxy = - new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); + this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); } /** @@ -63,26 +63,24 @@ class KinesisDataFetcher { if (!isInitialized) { throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization."); } - - DataFetcherResult response; + if (nextIterator != null) { try { - response = new AdvancingResult(kinesisProxy.get(nextIterator, maxRecords)); + return new AdvancingResult(kinesisProxy.get(nextIterator, maxRecords)); } catch (ResourceNotFoundException e) { LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); - response = TERMINAL_RESULT; + return TERMINAL_RESULT; } } else { - response = TERMINAL_RESULT; + return TERMINAL_RESULT; } - - return response; } final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() { @Override public GetRecordsResult getResult() { - return null; + return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList()) + .withNextShardIterator(null); } @Override @@ -98,7 +96,7 @@ class KinesisDataFetcher { }; @Data - private class AdvancingResult implements DataFetcherResult { + class AdvancingResult implements DataFetcherResult { final GetRecordsResult result; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java new file mode 100644 index 00000000..5369c0f4 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -0,0 +1,223 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; + +import com.amazonaws.SdkClientException; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; +import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +import lombok.NonNull; +import lombok.extern.apachecommons.CommonsLog; +import org.apache.commons.lang.Validate; + +/** + * This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the + * next set of records and stores it in the cache. The size of the cache is limited by setting + * maxPendingProcessRecordsInput i.e. the maximum number of GetRecordsResult that the cache can store, maxByteSize + * i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should + * be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from + * the record processor is blocked till records are retrieved from Kinesis. + */ +@CommonsLog +public class PrefetchGetRecordsCache implements GetRecordsCache { + LinkedBlockingQueue getRecordsResultQueue; + private int maxPendingProcessRecordsInput; + private int maxByteSize; + private int maxRecordsCount; + private final int maxRecordsPerCall; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final ExecutorService executorService; + private final IMetricsFactory metricsFactory; + private final long idleMillisBetweenCalls; + private Instant lastSuccessfulCall; + private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; + private PrefetchCounters prefetchCounters; + private boolean started = false; + private final String operation; + + /** + * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a + * LinkedBlockingQueue. + * + * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.PrefetchGetRecordsCache + * + * @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before + * blocking + * @param maxByteSize Max byte size of the queue before blocking next get records call + * @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects + * @param maxRecordsPerCall Max records to be returned per call + * @param getRecordsRetrievalStrategy Retrieval strategy for the get records call + * @param executorService Executor service for the cache + * @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call + */ + public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount, + final int maxRecordsPerCall, + @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + @NonNull final ExecutorService executorService, + long idleMillisBetweenCalls, + @NonNull final IMetricsFactory metricsFactory, + @NonNull String operation) { + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.maxRecordsPerCall = maxRecordsPerCall; + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); + this.prefetchCounters = new PrefetchCounters(); + this.executorService = executorService; + this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); + Validate.notEmpty(operation, "Operation cannot be empty"); + this.operation = operation; + } + + @Override + public void start() { + if (executorService.isShutdown()) { + throw new IllegalStateException("ExecutorService has been shutdown."); + } + + if (!started) { + log.info("Starting prefetching thread."); + executorService.execute(defaultGetRecordsCacheDaemon); + } + started = true; + } + + @Override + public ProcessRecordsInput getNextResult() { + if (executorService.isShutdown()) { + throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests."); + } + + if (!started) { + throw new IllegalStateException("Cache has not been initialized, make sure to call start."); + } + ProcessRecordsInput result = null; + try { + result = getRecordsResultQueue.take().withCacheExitTime(Instant.now()); + prefetchCounters.removed(result); + } catch (InterruptedException e) { + log.error("Interrupted while getting records from the cache", e); + } + return result; + } + + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + + @Override + public void shutdown() { + defaultGetRecordsCacheDaemon.isShutdown = true; + executorService.shutdownNow(); + started = false; + } + + private class DefaultGetRecordsCacheDaemon implements Runnable { + volatile boolean isShutdown = false; + + @Override + public void run() { + while (!isShutdown) { + if (Thread.currentThread().isInterrupted()) { + log.warn("Prefetch thread was interrupted."); + break; + } + MetricsHelper.startScope(metricsFactory, operation); + if (prefetchCounters.shouldGetNewRecords()) { + try { + sleepBeforeNextCall(); + GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + lastSuccessfulCall = Instant.now(); + ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + .withRecords(getRecordsResult.getRecords()) + .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()) + .withCacheEntryTime(lastSuccessfulCall); + getRecordsResultQueue.put(processRecordsInput); + prefetchCounters.added(processRecordsInput); + } catch (InterruptedException e) { + log.info("Thread was interrupted, indicating shutdown was called on the cache."); + } catch (SdkClientException e) { + log.error("Exception thrown while fetching records from Kinesis", e); + } catch (Throwable e) { + log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + + " Please search for the exception/error online to check what is going on. If the " + + "issue persists or is a recurring problem, feel free to open an issue on, " + + "https://github.com/awslabs/amazon-kinesis-client.", e); + } finally { + MetricsHelper.endScope(); + } + } + } + callShutdownOnStrategy(); + } + + private void callShutdownOnStrategy() { + if (!getRecordsRetrievalStrategy.isShutdown()) { + getRecordsRetrievalStrategy.shutdown(); + } + } + + private void sleepBeforeNextCall() throws InterruptedException { + if (lastSuccessfulCall == null) { + return; + } + long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); + if (timeSinceLastCall < idleMillisBetweenCalls) { + Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); + } + } + } + + private class PrefetchCounters { + private long size = 0; + private long byteSize = 0; + + public synchronized void added(final ProcessRecordsInput result) { + size += getSize(result); + byteSize += getByteSize(result); + } + + public synchronized void removed(final ProcessRecordsInput result) { + size -= getSize(result); + byteSize -= getByteSize(result); + } + + private long getSize(final ProcessRecordsInput result) { + return result.getRecords().size(); + } + + private long getByteSize(final ProcessRecordsInput result) { + return result.getRecords().stream().mapToLong(record -> record.getData().array().length).sum(); + } + + public synchronized boolean shouldGetNewRecords() { + return size < maxRecordsCount && byteSize < maxByteSize; + } + } + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 90ac2c09..9aca832e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -18,7 +18,6 @@ import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.ListIterator; -import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +62,7 @@ class ProcessTask implements ITask { private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * @param shardInfo @@ -78,17 +77,17 @@ class ProcessTask implements ITask { * Kinesis data fetcher (used to fetch records from Kinesis) * @param backoffTimeMillis * backoff time when catching exceptions - * @param getRecordsRetrievalStrategy + * @param getRecordsCache * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - getRecordsRetrievalStrategy); + getRecordsCache); } /** @@ -108,9 +107,9 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -120,7 +119,7 @@ class ProcessTask implements ITask { this.backoffTimeMillis = backoffTimeMillis; this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -149,7 +148,6 @@ class ProcessTask implements ITask { scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId()); scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY); - Exception exception = null; try { @@ -158,10 +156,10 @@ class ProcessTask implements ITask { return new TaskResult(null, true); } - final GetRecordsResult getRecordsResult = getRecordsResult(); + final ProcessRecordsInput processRecordsInput = getRecordsResult(); throttlingReporter.success(); - List records = getRecordsResult.getRecords(); - + List records = processRecordsInput.getRecords(); + if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { @@ -175,7 +173,7 @@ class ProcessTask implements ITask { recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { - callProcessRecords(getRecordsResult, records); + callProcessRecords(processRecordsInput, records); } } catch (ProvisionedThroughputExceededException pte) { throttlingReporter.throttled(); @@ -205,18 +203,18 @@ class ProcessTask implements ITask { /** * Dispatches a batch of records to the record processor, and handles any fallout from that. - * - * @param getRecordsResult + * + * @param input * the result of the last call to Kinesis * @param records * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. */ - private void callProcessRecords(GetRecordsResult getRecordsResult, List records) { + private void callProcessRecords(ProcessRecordsInput input, List records) { LOG.debug("Calling application processRecords() with " + records.size() + " records from " + shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + .withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { @@ -233,7 +231,7 @@ class ProcessTask implements ITask { /** * Whether we should call process records or not - * + * * @param records * the records returned from the call to Kinesis, and/or deaggregation * @return true if the set of records should be dispatched to the record process, false if they should not. @@ -244,7 +242,7 @@ class ProcessTask implements ITask { /** * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation - * + * * @param records * the records to deaggregate is deaggregation is required. * @return returns either the deaggregated records, or the original records @@ -267,7 +265,7 @@ class ProcessTask implements ITask { /** * Emits metrics, and sleeps if there are no records available - * + * * @param startTimeMillis * the time when the task started */ @@ -304,8 +302,8 @@ class ProcessTask implements ITask { * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue, - final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { @@ -339,7 +337,7 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResult() { + private ProcessRecordsInput getRecordsResult() { try { return getRecordsResultAndRecordMillisBehindLatest(); } catch (ExpiredIteratorException e) { @@ -375,22 +373,17 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { - final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); + private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() { + final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); - if (getRecordsResult == null) { - // Stream no longer exists - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - - if (getRecordsResult.getMillisBehindLatest() != null) { + if (processRecordsInput.getMillisBehindLatest() != null) { MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, - getRecordsResult.getMillisBehindLatest(), + processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY); } - return getRecordsResult; + return processRecordsInput; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..be8316d7 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,74 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +/** + * This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard. + */ +public interface RecordsFetcherFactory { + /** + * Returns a GetRecordsCache to be used for retrieving records for a given shard. + * + * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache + * @param shardId ShardId of the shard that the fetcher will retrieve records for + * @param metricsFactory MetricsFactory used to create metricScope + * + * @return GetRecordsCache used to get records from Kinesis. + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory); + + /** + * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are + * blocked. + * + * @param maxPendingProcessRecordsInput The maximum number of ProcessRecordsInput objects that the cache will accept + * before blocking. + */ + void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput); + + /** + * Sets the max byte size for the GetRecordsCache, before further requests are blocked. The byte size of the cache + * is the sum of byte size of all the ProcessRecordsInput objects in the cache at any point of time. + * + * @param maxByteSize The maximum byte size for the cache before blocking. + */ + void setMaxByteSize(int maxByteSize); + + /** + * Sets the max number of records for the GetRecordsCache can hold, before further requests are blocked. The records + * count is the sum of all records present in across all the ProcessRecordsInput objects in the cache at any point + * of time. + * + * @param maxRecordsCount The mximum number of records in the cache before blocking. + */ + void setMaxRecordsCount(int maxRecordsCount); + + /** + * Sets the dataFetchingStrategy to determine the type of GetRecordsCache to be used. + * + * @param dataFetchingStrategy Fetching strategy to be used + */ + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + + /** + * Sets the maximum idle time between two get calls. + * + * @param idleMillisBetweenCalls Sleep millis between calls. + */ + void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 4bbe1939..d7a5545e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import lombok.Getter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +31,8 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; + /** * Responsible for consuming data records of a (specified) shard. * The instance should be shutdown when we lose the primary responsibility for a shard. @@ -43,6 +44,7 @@ class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; private final ShardInfo shardInfo; @@ -61,7 +63,7 @@ class ShardConsumer { private Future future; @Getter - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, @@ -91,6 +93,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -99,19 +102,31 @@ class ShardConsumer { */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config) { + this(shardInfo, + streamConfig, + checkpoint, + recordProcessor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional.empty(), + Optional.empty(), + config); } /** @@ -126,6 +141,7 @@ class ShardConsumer { * @param backoffTimeMillis backoff interval when we encounter exceptions * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. + * @param config Kinesis library configuration */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, @@ -140,26 +156,86 @@ class ShardConsumer { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool) { - this.streamConfig = streamConfig; - this.recordProcessor = recordProcessor; - this.executorService = executorService; - this.shardInfo = shardInfo; - this.checkpoint = checkpoint; - this.recordProcessorCheckpointer = - new RecordProcessorCheckpointer(shardInfo, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + + this( + shardInfo, + streamConfig, + checkpoint, + recordProcessor, + new RecordProcessorCheckpointer( + shardInfo, checkpoint, - new SequenceNumberValidator(streamConfig.getStreamProxy(), + new SequenceNumberValidator( + streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); - this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config + ); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseManager Used to create leases for new shards + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + */ + ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + this.shardInfo = shardInfo; + this.streamConfig = streamConfig; + this.checkpoint = checkpoint; + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.leaseManager = leaseManager; - this.metricsFactory = metricsFactory; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.executorService = executorService; + this.metricsFactory = metricsFactory; this.taskBackoffTimeMillis = backoffTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); + this.config = config; + this.dataFetcher = kinesisDataFetcher; + this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), + this.getShardInfo().getShardId(), this.metricsFactory); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index f56033a8..bd40d686 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -46,7 +46,7 @@ class ShutdownTask implements ITask { private final boolean cleanupLeasesOfCompletedShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * Constructor. @@ -61,7 +61,7 @@ class ShutdownTask implements ITask { boolean cleanupLeasesOfCompletedShards, ILeaseManager leaseManager, long backoffTimeMillis, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -71,7 +71,7 @@ class ShutdownTask implements ITask { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; } /* @@ -111,7 +111,7 @@ class ShutdownTask implements ITask { } } LOG.debug("Shutting down retrieval strategy."); - getRecordsRetrievalStrategy.shutdown(); + getRecordsCache.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); } catch (Exception e) { applicationException = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..e6c9f3b0 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.Executors; + +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.apachecommons.CommonsLog; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxPendingProcessRecordsInput = 3; + private int maxByteSize = 8 * 1024 * 1024; + private int maxRecordsCount = 30000; + private long idleMillisBetweenCalls = 1500L; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + private IMetricsFactory metricsFactory; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); + } else { + return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, + Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("prefetch-cache-" + shardId + "-%04d") + .build()), + idleMillisBetweenCalls, + metricsFactory, + "ProcessTask"); + } + } + + @Override + public void setMaxPendingProcessRecordsInput(int maxPendingProcessRecordsInput){ + this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; + } + + @Override + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + @Override + public void setMaxRecordsCount(int maxRecordsCount) { + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } + + public void setIdleMillisBetweenCalls(final long idleMillisBetweenCalls) { + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..d2ea738d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -73,6 +73,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; + private final KinesisClientLibConfiguration config; private final StreamConfig streamConfig; private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; @@ -245,6 +246,7 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + config, new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -306,6 +308,8 @@ public class Worker implements Runnable { * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @paran config + * Kinesis Library configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -333,24 +337,25 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); } - /** * @param applicationName * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Library Configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -382,7 +387,7 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, @@ -391,6 +396,7 @@ public class Worker implements Runnable { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; + this.config = config; this.streamConfig = streamConfig; this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; @@ -411,7 +417,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } - /** * @return the applicationName */ @@ -819,11 +824,11 @@ public class Worker implements Runnable { * * @param shardInfo * Kinesis shard info - * @param factory + * @param processorFactory * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { + ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -832,20 +837,30 @@ public class Worker implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - consumer = buildConsumer(shardInfo, factory); + consumer = buildConsumer(shardInfo, processorFactory); shardInfoShardConsumerMap.put(shardInfo, consumer); wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - IRecordProcessor recordProcessor = factory.createProcessor(); + protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, - leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); + return new ShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + leaseCoordinator.getLeaseManager(), + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config); } @@ -1049,6 +1064,7 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; private AmazonKinesis kinesisClient; private AmazonDynamoDB dynamoDBClient; @@ -1244,6 +1260,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, + config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java index bd960c08..362af357 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/ProcessRecordsInput.java @@ -14,17 +14,25 @@ */ package com.amazonaws.services.kinesis.clientlibrary.types; +import java.time.Duration; +import java.time.Instant; import java.util.List; + import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.model.Record; +import lombok.Getter; + /** * Container for the parameters to the IRecordProcessor's * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords( * ProcessRecordsInput processRecordsInput) processRecords} method. */ public class ProcessRecordsInput { - + @Getter + private Instant cacheEntryTime; + @Getter + private Instant cacheExitTime; private List records; private IRecordProcessorCheckpointer checkpointer; private Long millisBehindLatest; @@ -96,4 +104,21 @@ public class ProcessRecordsInput { this.millisBehindLatest = millisBehindLatest; return this; } + + public ProcessRecordsInput withCacheEntryTime(Instant cacheEntryTime) { + this.cacheEntryTime = cacheEntryTime; + return this; + } + + public ProcessRecordsInput withCacheExitTime(Instant cacheExitTime) { + this.cacheExitTime = cacheExitTime; + return this; + } + + public Duration getTimeSpentInCache() { + if (cacheEntryTime == null || cacheExitTime == null) { + return Duration.ZERO; + } + return Duration.between(cacheEntryTime, cacheExitTime); + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 8e89204e..30b877e8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -14,29 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - - import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; @@ -50,6 +27,29 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + @RunWith(MockitoJUnitRunner.class) public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @@ -125,7 +125,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Test @Ignore public void testInterrupted() throws InterruptedException, ExecutionException { - Future mockFuture = mock(Future.class); when(completionService.submit(any())).thenReturn(mockFuture); when(completionService.poll()).thenReturn(mockFuture); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java new file mode 100644 index 00000000..731c4653 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the BlockingGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class BlockingGetRecordsCacheTest { + private static final int MAX_RECORDS_PER_COUNT = 10_000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + + private List records; + private BlockingGetRecordsCache blockingGetRecordsCache; + + @Before + public void setup() { + records = new ArrayList<>(); + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS); + + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + } + + @Test + public void testGetNextRecordsWithNoRecords() { + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + assertNull(result.getCacheEntryTime()); + assertNull(result.getCacheExitTime()); + assertEquals(result.getTimeSpentInCache(), Duration.ZERO); + } + + @Test + public void testGetNextRecordsWithRecords() { + Record record = new Record(); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + ProcessRecordsInput result = blockingGetRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 63f20a72..fa163ad2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -55,6 +55,8 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock + private KinesisClientLibConfiguration config; + @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -75,7 +77,7 @@ public class ConsumerStatesTest { @Mock private InitialPositionInStreamExtended initialPositionInStream; @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -98,7 +100,7 @@ public class ConsumerStatesTest { when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getShutdownReason()).thenReturn(reason); - when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); + when(consumer.getGetRecordsCache()).thenReturn(getRecordsCache); } private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; @@ -207,6 +209,33 @@ public class ConsumerStatesTest { } + @Test + public void processingStateRecordsFetcher() { + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -313,7 +342,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -323,17 +352,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -346,7 +375,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index cfa8be10..177546db 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import junit.framework.Assert; +import java.util.Date; import org.junit.Test; import org.mockito.Mockito; @@ -35,7 +35,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorF import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; -import java.util.Date; +import junit.framework.Assert; public class KinesisClientLibConfigurationTest { private static final long INVALID_LONG = 0L; @@ -95,7 +95,8 @@ public class KinesisClientLibConfigurationTest { // Try each argument at one time. KinesisClientLibConfiguration config = null; long[] longValues = - { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; + { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, + TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -300,30 +301,30 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, - TEST_STRING, - TEST_STRING, - TEST_STRING, - null, - null, - null, - null, - TEST_VALUE_LONG, - TEST_STRING, - 3, - TEST_VALUE_LONG, - false, - TEST_VALUE_LONG, - TEST_VALUE_LONG, - true, - new ClientConfiguration(), - new ClientConfiguration(), - new ClientConfiguration(), - TEST_VALUE_LONG, - TEST_VALUE_LONG, - 1, - skipCheckpointValidationValue, - "abcd", - TEST_VALUE_LONG); + TEST_STRING, + TEST_STRING, + TEST_STRING, + null, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + 3, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + 1, + skipCheckpointValidationValue, + "abcd", + TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { System.out.println(e.getMessage()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 3cc8cb5a..e27e8d26 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -15,9 +15,10 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsEmptyCollection.empty; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -37,11 +38,10 @@ import java.util.List; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint; @@ -50,9 +50,10 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; /** * Unit tests for KinesisDataFetcher. @@ -208,6 +209,103 @@ public class KinesisDataFetcherTest { // Test shard has reached the end Assert.assertTrue("Shard should reach the end", dataFetcher.isShardEndReached()); } + + @Test + public void testNonNullGetRecords() { + String nextIterator = "TestIterator"; + int maxRecords = 100; + + KinesisProxy mockProxy = mock(KinesisProxy.class); + doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); + + DataFetcherResult dataFetcherResult = dataFetcher.getRecords(maxRecords); + + assertThat(dataFetcherResult, notNullValue()); + } + + @Test + public void testFetcherDoesNotAdvanceWithoutAccept() { + final String INITIAL_ITERATOR = "InitialIterator"; + final String NEXT_ITERATOR_ONE = "NextIteratorOne"; + final String NEXT_ITERATOR_TWO = "NextIteratorTwo"; + when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR); + GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class); + when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE); + when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults); + + GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class); + when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults); + when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO); + + GetRecordsResult finalResult = mock(GetRecordsResult.class); + when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult); + when(finalResult.getNextShardIterator()).thenReturn(null); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); + dataFetcher.initialize("TRIM_HORIZON", + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); + + assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR); + assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE); + + assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE); + assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO); + + assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO); + assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null); + + verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt()); + verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt()); + verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt()); + + reset(kinesisProxy); + + DataFetcherResult terminal = dataFetcher.getRecords(100); + assertThat(terminal.isShardEnd(), equalTo(true)); + assertThat(terminal.getResult(), notNullValue()); + GetRecordsResult terminalResult = terminal.getResult(); + assertThat(terminalResult.getRecords(), notNullValue()); + assertThat(terminalResult.getRecords(), empty()); + assertThat(terminalResult.getNextShardIterator(), nullValue()); + assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT)); + + verify(kinesisProxy, never()).get(anyString(), anyInt()); + } + + private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, + String previousValue, String nextValue) { + DataFetcherResult acceptResult = dataFetcher.getRecords(100); + assertThat(acceptResult.getResult(), equalTo(expectedResult)); + + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + assertThat(dataFetcher.isShardEndReached(), equalTo(false)); + + assertThat(acceptResult.accept(), equalTo(expectedResult)); + assertThat(dataFetcher.getNextIterator(), equalTo(nextValue)); + if (nextValue == null) { + assertThat(dataFetcher.isShardEndReached(), equalTo(true)); + } + + verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt()); + + return acceptResult; + } + + private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, + String previousValue) { + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + DataFetcherResult noAcceptResult = dataFetcher.getRecords(100); + assertThat(noAcceptResult.getResult(), equalTo(expectedResult)); + + assertThat(dataFetcher.getNextIterator(), equalTo(previousValue)); + + verify(kinesisProxy).get(eq(previousValue), anyInt()); + + return noAcceptResult; + } @Test public void testFetcherDoesNotAdvanceWithoutAccept() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java new file mode 100644 index 00000000..37d0e446 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -0,0 +1,199 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.Record; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +import lombok.extern.apachecommons.CommonsLog; + +/** + * These are the integration tests for the PrefetchGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +@CommonsLog +public class PrefetchGetRecordsCacheIntegrationTest { + private static final int MAX_SIZE = 3; + private static final int MAX_BYTE_SIZE = 5 * 1024 * 1024; + private static final int MAX_RECORDS_COUNT = 30_000; + private static final int MAX_RECORDS_PER_CALL = 10_000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + + private PrefetchGetRecordsCache getRecordsCache; + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private KinesisDataFetcher dataFetcher; + private ExecutorService executorService; + private List records; + private String operation = "ProcessTask"; + + @Mock + private IKinesisProxy proxy; + + @Mock + private ShardInfo shardInfo; + + @Before + public void setup() { + records = new ArrayList<>(); + dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo); + getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + executorService = spy(Executors.newFixedThreadPool(1)); + + getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE, + MAX_BYTE_SIZE, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy, + executorService, + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); + } + + @Test + public void testRollingCache() { + getRecordsCache.start(); + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult(); + + assertTrue(processRecordsInput1.getRecords().isEmpty()); + assertEquals(processRecordsInput1.getMillisBehindLatest(), new Long(1000)); + assertNotNull(processRecordsInput1.getCacheEntryTime()); + + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + + assertNotEquals(processRecordsInput1, processRecordsInput2); + } + + @Test + public void testFullCache() { + getRecordsCache.start(); + sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS); + + assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE); + + ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult(); + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + + assertNotEquals(processRecordsInput1, processRecordsInput2); + } + + @Test + public void testDifferentShardCaches() { + ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1)); + KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo)); + GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, "Test-shard")); + GetRecordsCache getRecordsCache2 = new PrefetchGetRecordsCache( + MAX_SIZE, + MAX_BYTE_SIZE, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy2, + executorService2, + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); + + getRecordsCache.start(); + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + Record record = mock(Record.class); + ByteBuffer byteBuffer = ByteBuffer.allocate(512 * 1024); + when(record.getData()).thenReturn(byteBuffer); + + records.add(record); + records.add(record); + records.add(record); + records.add(record); + getRecordsCache2.start(); + + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + ProcessRecordsInput p1 = getRecordsCache.getNextResult(); + + ProcessRecordsInput p2 = getRecordsCache2.getNextResult(); + + assertNotEquals(p1, p2); + assertTrue(p1.getRecords().isEmpty()); + assertFalse(p2.getRecords().isEmpty()); + assertEquals(p2.getRecords().size(), records.size()); + + getRecordsCache2.shutdown(); + sleep(100L); + verify(executorService2).shutdownNow(); + verify(getRecordsRetrievalStrategy2).shutdown(); + } + + @After + public void shutdown() { + getRecordsCache.shutdown(); + sleep(100L); + verify(executorService).shutdownNow(); + verify(getRecordsRetrievalStrategy).shutdown(); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + + private class KinesisDataFetcherForTest extends KinesisDataFetcher { + public KinesisDataFetcherForTest(final IKinesisProxy kinesisProxy, + final ShardInfo shardInfo) { + super(kinesisProxy, shardInfo); + } + + @Override + public DataFetcherResult getRecords(final int maxRecords) { + GetRecordsResult getRecordsResult = new GetRecordsResult(); + getRecordsResult.setRecords(new ArrayList<>(records)); + getRecordsResult.setMillisBehindLatest(1000L); + + return new AdvancingResult(getRecordsResult); + } + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java new file mode 100644 index 00000000..6091baa9 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -0,0 +1,215 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; + +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +/** + * Test class for the PrefetchGetRecordsCache class. + */ +@RunWith(MockitoJUnitRunner.class) +public class PrefetchGetRecordsCacheTest { + private static final int SIZE_512_KB = 512 * 1024; + private static final int SIZE_1_MB = 2 * SIZE_512_KB; + private static final int MAX_RECORDS_PER_CALL = 10000; + private static final int MAX_SIZE = 5; + private static final int MAX_RECORDS_COUNT = 15000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + @Mock + private GetRecordsResult getRecordsResult; + @Mock + private Record record; + + private List records; + private ExecutorService executorService; + private LinkedBlockingQueue spyQueue; + private PrefetchGetRecordsCache getRecordsCache; + private String operation = "ProcessTask"; + + @Before + public void setup() { + executorService = spy(Executors.newFixedThreadPool(1)); + getRecordsCache = new PrefetchGetRecordsCache( + MAX_SIZE, + 3 * SIZE_1_MB, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy, + executorService, + IDLE_MILLIS_BETWEEN_CALLS, + new NullMetricsFactory(), + operation); + spyQueue = spy(getRecordsCache.getRecordsResultQueue); + records = spy(new ArrayList<>()); + + when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(records); + } + + @Test + public void testGetRecords() { + when(records.size()).thenReturn(1000); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_512_KB)); + + records.add(record); + records.add(record); + records.add(record); + records.add(record); + records.add(record); + + getRecordsCache.start(); + ProcessRecordsInput result = getRecordsCache.getNextResult(); + + assertEquals(result.getRecords(), records); + + verify(executorService).execute(any()); + verify(getRecordsRetrievalStrategy, atLeast(1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + } + + @Test + public void testFullCacheByteSize() { + when(records.size()).thenReturn(500); + when(record.getData()).thenReturn(createByteBufferWithSize(SIZE_1_MB)); + + records.add(record); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(3)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), 3); + } + + @Test + public void testFullCacheRecordsCount() { + int recordsSize = 4500; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + sleep(2000); + + int callRate = (int) Math.ceil((double) MAX_RECORDS_COUNT/recordsSize); + verify(getRecordsRetrievalStrategy, times(callRate)).getRecords(MAX_RECORDS_PER_CALL); + assertEquals(spyQueue.size(), callRate); + assertTrue(callRate < MAX_SIZE); + } + + @Test + public void testFullCacheSize() { + int recordsSize = 200; + when(records.size()).thenReturn(recordsSize); + + getRecordsCache.start(); + + // Sleep for a few seconds for the cache to fill up. + sleep(2000); + + verify(getRecordsRetrievalStrategy, times(MAX_SIZE + 1)).getRecords(eq(MAX_RECORDS_PER_CALL)); + assertEquals(spyQueue.size(), MAX_SIZE); + } + + @Test + public void testMultipleCacheCalls() { + int recordsSize = 20; + when(record.getData()).thenReturn(createByteBufferWithSize(1024)); + + IntStream.range(0, recordsSize).forEach(i -> records.add(record)); + + getRecordsCache.start(); + ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); + + verify(executorService).execute(any()); + assertEquals(processRecordsInput.getRecords(), records); + assertNotNull(processRecordsInput.getCacheEntryTime()); + assertNotNull(processRecordsInput.getCacheExitTime()); + + sleep(2000); + + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + assertNotEquals(processRecordsInput, processRecordsInput2); + assertEquals(processRecordsInput2.getRecords(), records); + assertNotEquals(processRecordsInput2.getTimeSpentInCache(), Duration.ZERO); + + assertTrue(spyQueue.size() <= MAX_SIZE); + } + + @Test(expected = IllegalStateException.class) + public void testGetNextRecordsWithoutStarting() { + verify(executorService, times(0)).execute(any()); + getRecordsCache.getNextResult(); + } + + @Test(expected = IllegalStateException.class) + public void testCallAfterShutdown() { + when(executorService.isShutdown()).thenReturn(true); + getRecordsCache.getNextResult(); + } + + @After + public void shutdown() { + getRecordsCache.shutdown(); + verify(executorService).shutdownNow(); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + + private ByteBuffer createByteBufferWithSize(int size) { + ByteBuffer byteBuffer = ByteBuffer.allocate(size); + byteBuffer.put(new byte[size]); + return byteBuffer; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index b24bf3ec..94d0918e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -18,8 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -49,7 +48,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.google.protobuf.ByteString; @@ -77,7 +75,7 @@ public class ProcessTaskTest { @Mock private ThrottlingReporter throttlingReporter; @Mock - private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -95,32 +93,39 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); + shardInfo, + config, + mockRecordProcessor, + mockCheckpointer, + mockDataFetcher, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + throttlingReporter, + getRecordsCache); } @Test public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) - .getRecords(maxRecords); + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache) + .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(getRecordsCache).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @Test public void testProcessTaskWithNonExistentStream() { - // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); + // Data fetcher returns a null Result ` the stream does not exist + doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult(); TaskResult result = processTask.call(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(getRecordsCache).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +309,13 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( - new GetRecordsResult().withRecords(records)); + when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); + verify(getRecordsCache).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java new file mode 100644 index 00000000..912804da --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -0,0 +1,44 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class RecordsFetcherFactoryTest { + private String shardId = "TestShard"; + private RecordsFetcherFactory recordsFetcherFactory; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + @Mock + private IMetricsFactory metricsFactory; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + } + + @Test + public void createDefaultRecordsFetcherTest() { + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); + assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); + } + + @Test + public void createPrefetchRecordsFetcherTest() { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, + metricsFactory); + assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index a3f786a6..0bd2f31a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -20,7 +20,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -52,6 +51,7 @@ import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -95,10 +95,16 @@ public class ShardConsumerTest { // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); - + private final int maxRecords = 500; + private RecordsFetcherFactory recordsFetcherFactory; + + private GetRecordsCache getRecordsCache; + @Mock private IRecordProcessor processor; @Mock + private KinesisClientLibConfiguration config; + @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -107,6 +113,14 @@ public class ShardConsumerTest { @Mock private ShutdownNotification shutdownNotification; + @Before + public void setup() { + getRecordsCache = null; + + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + } + /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -137,8 +151,9 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -154,7 +169,6 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -185,7 +199,8 @@ public class ShardConsumerTest { spyExecutorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -226,7 +241,8 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; @@ -299,7 +315,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -310,18 +325,41 @@ public class ShardConsumerTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher), + 0L)); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -330,6 +368,7 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -342,6 +381,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(5)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.notifyShutdownRequested(shutdownNotification); @@ -365,6 +406,8 @@ public class ShardConsumerTest { verify(shutdownNotification, atLeastOnce()).shutdownComplete(); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + + verify(getRecordsCache).shutdown(); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); @@ -401,7 +444,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -413,18 +455,41 @@ public class ShardConsumerTest { atTimestamp); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher), + 0L)); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -433,6 +498,8 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); + + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -445,6 +512,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(4)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); @@ -457,8 +526,11 @@ public class ShardConsumerTest { executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); + verify(getRecordsCache).shutdown(); + String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); assertEquals(4, processor.getProcessedRecords().size()); file.delete(); @@ -486,11 +558,15 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + + GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache()); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); @@ -535,9 +611,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.empty(), - Optional.empty()); + Optional.empty(), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + SynchronousGetRecordsRetrievalStrategy.class); } @Test @@ -563,9 +641,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.of(1), - Optional.of(2)); + Optional.of(2), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + AsynchronousGetRecordsRetrievalStrategy.class); } //@formatter:off (gets the formatting wrong) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 5d91c698..17a53137 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -59,7 +59,7 @@ public class ShutdownTaskTest { IRecordProcessor defaultRecordProcessor = new TestStreamlet(); @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; /** * @throws java.lang.Exception @@ -80,7 +80,7 @@ public class ShutdownTaskTest { */ @Before public void setUp() throws Exception { - doNothing().when(getRecordsRetrievalStrategy).shutdown(); + doNothing().when(getRecordsCache).shutdown(); } /** @@ -109,7 +109,7 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -135,11 +135,11 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); - verify(getRecordsRetrievalStrategy).shutdown(); + verify(getRecordsCache).shutdown(); } /** @@ -147,7 +147,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..a8856a0b 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,10 +21,20 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.lang.Thread.State; @@ -60,6 +70,7 @@ import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Matchers; @@ -129,6 +140,9 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; + + private RecordsFetcherFactory recordsFetcherFactory; + private KinesisClientLibConfiguration config; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @@ -154,6 +168,13 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; + + @Before + public void setup() { + config = spy(new KinesisClientLibConfiguration("app", null, null, null)); + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -195,14 +216,13 @@ public class WorkerTest { /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#getApplicationName()}. + * Test method for {@link Worker#getApplicationName()}. */ @Test public final void testGetStageName() { final String stageName = "testStageName"; - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); - Worker worker = new Worker(v1RecordProcessorFactory, clientConfig); + config = new KinesisClientLibConfiguration(stageName, null, null, null); + Worker worker = new Worker(v1RecordProcessorFactory, config); Assert.assertEquals(stageName, worker.getApplicationName()); } @@ -210,6 +230,7 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + config = new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -228,7 +249,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + config, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -275,10 +298,22 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, - parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, - leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); + Worker worker = new Worker(stageName, + streamletFactory, + config, + streamConfig, + INITIAL_POSITION_LATEST, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + nullMetricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + shardPrioritization); Worker workerSpy = spy(worker); @@ -314,6 +349,7 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + config = new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -332,7 +368,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + config, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -371,6 +409,7 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -386,6 +425,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -437,7 +477,7 @@ public class WorkerTest { /** * Runs worker with threadPoolSize < numShards - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. + * Test method for {@link Worker#run()}. */ @Test public final void testOneSplitShard2Threads() throws Exception { @@ -448,12 +488,12 @@ public class WorkerTest { KinesisClientLease lease = ShardSyncer.newKCLLease(shardList.get(0)); lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } /** * Runs worker with threadPoolSize < numShards - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. + * Test method for {@link Worker#run()}. */ @Test public final void testOneSplitShard2ThreadsWithCallsForEmptyRecords() throws Exception { @@ -465,7 +505,10 @@ public class WorkerTest { lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); boolean callProcessRecordsForEmptyRecordList = true; - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(500); + recordsFetcherFactory.setIdleMillisBetweenCalls(0L); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } @Test @@ -490,7 +533,8 @@ public class WorkerTest { 10, kinesisProxy, v2RecordProcessorFactory, executorService, - cwMetricsFactory); + cwMetricsFactory, + config); // Give some time for thread to run. workerStarted.await(); @@ -526,7 +570,8 @@ public class WorkerTest { 10, kinesisProxy, v2RecordProcessorFactory, executorService, - cwMetricsFactory); + cwMetricsFactory, + config); // Give some time for thread to run. workerStarted.await(); @@ -572,6 +617,12 @@ public class WorkerTest { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); + + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); + GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList, initialLeases, @@ -581,7 +632,8 @@ public class WorkerTest { fileBasedProxy, v2RecordProcessorFactory, executorService, - nullMetricsFactory); + nullMetricsFactory, + config); // Only sleep for time that is required. processRecordsLatch.await(); @@ -672,7 +724,8 @@ public class WorkerTest { fileBasedProxy, v2RecordProcessorFactory, executorService, - nullMetricsFactory); + nullMetricsFactory, + config); // Only sleep for time that is required. processRecordsLatch.await(); @@ -742,10 +795,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -816,7 +881,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -888,10 +953,22 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + Worker worker = new InjectableWorker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization) { @Override void postConstruct() { this.gracefulShutdownCoordinator = coordinator; @@ -950,10 +1027,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1020,10 +1109,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1121,10 +1222,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1226,10 +1339,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1298,10 +1423,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1336,16 +1473,29 @@ public class WorkerTest { private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, - parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, - checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + super(applicationName, + recordProcessorFactory, + config, + streamConfig, + initialPositionInStream, + parentShardPollIntervalMillis, + shardSyncIdleTimeMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization); postConstruct(); } @@ -1578,14 +1728,15 @@ public class WorkerTest { lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP); initialLeases.add(lease); } - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } private void runAndTestWorker(List shardList, - int threadPoolSize, - List initialLeases, - boolean callProcessRecordsForEmptyRecordList, - int numberOfRecordsPerShard) throws Exception { + int threadPoolSize, + List initialLeases, + boolean callProcessRecordsForEmptyRecordList, + int numberOfRecordsPerShard, + KinesisClientLibConfiguration clientConfig) throws Exception { File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numberOfRecordsPerShard, "unitTestWT001"); IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); @@ -1594,10 +1745,10 @@ public class WorkerTest { TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, - numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory); + numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); // TestStreamlet will release the semaphore once for every record it processes recordCounter.acquire(numberOfRecordsPerShard * shardList.size()); @@ -1614,14 +1765,15 @@ public class WorkerTest { } private WorkerThread runWorker(List shardList, - List initialLeases, - boolean callProcessRecordsForEmptyRecordList, - long failoverTimeMillis, - int numberOfRecordsPerShard, - IKinesisProxy kinesisProxy, - IRecordProcessorFactory recordProcessorFactory, - ExecutorService executorService, - IMetricsFactory metricsFactory) throws Exception { + List initialLeases, + boolean callProcessRecordsForEmptyRecordList, + long failoverTimeMillis, + int numberOfRecordsPerShard, + IKinesisProxy kinesisProxy, + IRecordProcessorFactory recordProcessorFactory, + ExecutorService executorService, + IMetricsFactory metricsFactory, + KinesisClientLibConfiguration clientConfig) throws Exception { final String stageName = "testStageName"; final int maxRecords = 2; @@ -1649,10 +1801,11 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,