diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index f6d96b4d..5d8e2e72 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; - /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. @@ -310,9 +308,10 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool()); + consumer.getConfig().getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(), + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRetryGetRecordsInSeconds(), + consumer.getMaxGetRecordsThreadPool()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 1bfd0fc0..480ba71a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + @Getter + private RecordsFetcherFactory recordsFetcherFactory; + /** * Constructor. * @@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + RecordsFetcherFactory recordsFetcherFactory) { + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsRegionNameValid(regionName); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamName; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; + this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis; } @@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration { return this; } + /** + * + * @param maxCacheSize the max number of records stored in the getRecordsCache + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { + checkIsValuePositive("maxCacheSize", maxCacheSize); + recordsFetcherFactory.setMaxSize(maxCacheSize); + return this; + } + + public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { + checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); + recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + return this; + } + + public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); + return this; + } + + public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { + checkIsValuePositive("maxRecordsCount", maxRecordsCount); + recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 02fc4d70..8591ec59 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -15,11 +15,11 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.math.BigInteger; -import java.util.Collections; import java.util.List; import java.util.ListIterator; import java.util.Optional; +import com.sun.org.apache.regexp.internal.RE; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,6 +55,7 @@ class ProcessTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; + private final GetRecordsCache recordsFetcher; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final KinesisDataFetcher dataFetcher; private final TaskType taskType = TaskType.PROCESS; @@ -62,7 +63,6 @@ class ProcessTask implements ITask { private final long backoffTimeMillis; private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, @@ -83,6 +83,8 @@ class ProcessTask implements ITask { * Stream configuration * @param recordProcessor * Record processor used to process the data records for the shard + * @param recordsFetcherFactory + * Record processor factory to create recordFetcher object * @param recordProcessorCheckpointer * Passed to the RecordProcessor so it can checkpoint progress * @param dataFetcher @@ -91,10 +93,11 @@ class ProcessTask implements ITask { * backoff time when catching exceptions */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, + RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** @@ -104,6 +107,8 @@ class ProcessTask implements ITask { * Stream configuration * @param recordProcessor * Record processor used to process the data records for the shard + * @param recordsFetcherFactory + * Record processor factory to create recordFetcher object * @param recordProcessorCheckpointer * Passed to the RecordProcessor so it can checkpoint progress * @param dataFetcher @@ -116,11 +121,12 @@ class ProcessTask implements ITask { * max number of threads in the getRecords thread pool. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { - this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, + RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisDataFetcher dataFetcher, long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool) { + this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo)); } @@ -132,6 +138,8 @@ class ProcessTask implements ITask { * Stream configuration * @param recordProcessor * Record processor used to process the data records for the shard + * @param recordsFetcherFactory + * RecordFetcher factory used to create recordFetcher object * @param recordProcessorCheckpointer * Passed to the RecordProcessor so it can checkpoint progress * @param dataFetcher @@ -142,9 +150,9 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -155,6 +163,7 @@ class ProcessTask implements ITask { this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.recordsFetcher = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -410,12 +419,7 @@ class ProcessTask implements ITask { * @return list of data records from Kinesis */ private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { - final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); - - if (getRecordsResult == null) { - // Stream no longer exists - return new GetRecordsResult().withRecords(Collections.emptyList()); - } + final GetRecordsResult getRecordsResult = recordsFetcher.getNextResult(); if (getRecordsResult.getMillisBehindLatest() != null) { MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..cdd80e49 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. + * Clients may choose to create separate instantiations, or re-use instantiations. + */ + +public interface RecordsFetcherFactory { + + /** + * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * + * @return Returns a record fetcher object + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + + void setMaxSize(int maxSize); + + void setMaxByteSize(int maxByteSize); + + void setMaxRecordsCount(int maxRecordsCount); + + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 69057b38..97de08d4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -43,6 +43,8 @@ class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + @Getter + private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; private final ShardInfo shardInfo; @@ -81,6 +83,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -92,6 +95,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -99,9 +103,9 @@ class ShardConsumer { IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** @@ -109,6 +113,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -122,6 +127,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -133,6 +139,7 @@ class ShardConsumer { Optional maxGetRecordsThreadPool) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; + this.config = config; this.executorService = executorService; this.shardInfo = shardInfo; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..cf0c0a2c --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.extern.apachecommons.CommonsLog; + +import java.util.concurrent.Executors; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxSize = 10; + private int maxByteSize = 15 * 1024 * 1024; + private int maxRecordsCount = 30000; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + public SimpleRecordsFetcherFactory(int maxRecords, int maxSize, int maxByteSize, int maxRecordsCount) { + this.maxRecords = maxRecords; + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + } else { + return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, dataFetchingStrategy, + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + } + } + + @Override + public void setMaxSize(int maxSize){ + this.maxSize = maxSize; + } + + @Override + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + @Override + public void setMaxRecordsCount(int maxRecordsCount) { + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..494d1c50 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -73,6 +73,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; + private final KinesisClientLibConfiguration config; private final StreamConfig streamConfig; private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; @@ -245,6 +246,7 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + config, new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -306,6 +308,8 @@ public class Worker implements Runnable { * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @paran config + * Kinesis Library configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -333,24 +337,25 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); } - /** * @param applicationName * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Library Configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -382,7 +387,7 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, @@ -391,6 +396,7 @@ public class Worker implements Runnable { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; + this.config = config; this.streamConfig = streamConfig; this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; @@ -411,7 +417,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } - /** * @return the applicationName */ @@ -819,11 +824,11 @@ public class Worker implements Runnable { * * @param shardInfo * Kinesis shard info - * @param factory + * @param processorFactory * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { + ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -832,17 +837,17 @@ public class Worker implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - consumer = buildConsumer(shardInfo, factory); + consumer = buildConsumer(shardInfo, processorFactory); shardInfoShardConsumerMap.put(shardInfo, consumer); wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - IRecordProcessor recordProcessor = factory.createProcessor(); + protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, + return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); @@ -1049,6 +1054,7 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; private AmazonKinesis kinesisClient; private AmazonDynamoDB dynamoDBClient; @@ -1244,6 +1250,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, + config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 307aa6b8..c201e803 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -57,6 +58,8 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock + private KinesisClientLibConfiguration config; + @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -76,6 +79,10 @@ public class ConsumerStatesTest { private IKinesisProxy kinesisProxy; @Mock private InitialPositionInStreamExtended initialPositionInStream; + @Mock + private RecordsFetcherFactory recordsFetcherFactory; + @Mock + private GetRecordsCache recordsFetcher; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -98,7 +105,8 @@ public class ConsumerStatesTest { when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getShutdownReason()).thenReturn(reason); - + when(consumer.getConfig()).thenReturn(config); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); } private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; @@ -215,6 +223,37 @@ public class ConsumerStatesTest { } + @Test + public void processingStateRecordsFetcher() { + when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1)); + when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2)); + when(recordsFetcherFactory.createRecordsFetcher((any()))).thenReturn(recordsFetcher); + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, procTask(GetRecordsCache.class, "recordsFetcher", equalTo(recordsFetcher))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -321,7 +360,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -331,17 +370,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -354,7 +393,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index b24bf3ec..4151ca93 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -78,6 +78,10 @@ public class ProcessTaskTest { private ThrottlingReporter throttlingReporter; @Mock private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; + @Mock + private RecordsFetcherFactory mockRecordsFetcherFactory; + @Mock + private GetRecordsCache mockRecordsFetcher; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -94,8 +98,9 @@ public class ProcessTaskTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); + when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); } @@ -103,13 +108,13 @@ public class ProcessTaskTest { public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) - .getRecords(maxRecords); + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher) + .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -117,10 +122,10 @@ public class ProcessTaskTest { @Test public void testProcessTaskWithNonExistentStream() { // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); + doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult(); TaskResult result = processTask.call(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +309,14 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( + when(mockRecordsFetcher.getNextResult()).thenReturn( new GetRecordsResult().withRecords(records)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); + verify(mockRecordsFetcher).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java new file mode 100644 index 00000000..17a77123 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -0,0 +1,41 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +public class RecordsFetcherFactoryTest { + + private RecordsFetcherFactory recordsFetcherFactory; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + } + + @Test + public void createDefaultRecordsFetcherTest() { + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); + } + + @Test + public void createPrefetchRecordsFetcherTest() { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 8073d0df..ba6b3d80 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -45,6 +46,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -97,6 +99,10 @@ public class ShardConsumerTest { @Mock private IRecordProcessor processor; @Mock + private KinesisClientLibConfiguration config; + @Mock + private RecordsFetcherFactory recordsFetcherFactory; + @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -104,7 +110,6 @@ public class ShardConsumerTest { private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -129,6 +134,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -177,6 +183,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -205,6 +212,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -218,6 +226,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -297,7 +306,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -313,6 +322,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -399,7 +409,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -416,6 +426,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -478,6 +489,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -489,6 +501,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..9f5bcbee 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -133,6 +133,8 @@ public class WorkerTest { @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @Mock + private KinesisClientLibConfiguration config; + @Mock private ILeaseManager leaseManager; @Mock private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; @@ -210,6 +212,8 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -228,7 +232,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -257,6 +263,8 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -275,7 +283,7 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); @@ -314,6 +322,8 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -332,7 +342,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -371,6 +383,8 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -386,6 +400,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -709,6 +724,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -742,7 +759,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -785,6 +802,8 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -816,7 +835,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -850,6 +869,8 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -888,7 +909,7 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -926,6 +947,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -950,7 +973,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -988,6 +1011,8 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1020,7 +1045,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1089,6 +1114,8 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1121,7 +1148,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1195,6 +1222,8 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1226,7 +1255,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1267,6 +1296,8 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1298,7 +1329,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1336,13 +1367,14 @@ public class WorkerTest { private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); @@ -1649,10 +1681,12 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,