From 49b761c5e2439963363f03eae1eb9565c296f423 Mon Sep 17 00:00:00 2001 From: BtXin Date: Fri, 22 Sep 2017 14:20:08 -0700 Subject: [PATCH] Merging changes (#225) * integrated prefetch with shardconsumer * fixed tests * added fatory methods * added tests and fixed broken tests * Resolved conflicts * Addressed comments * Integrated the changes --- .../lib/worker/ConsumerStates.java | 2 - .../worker/KinesisClientLibConfiguration.java | 142 ++++++++++++++++++ .../clientlibrary/lib/worker/ProcessTask.java | 22 +-- .../lib/worker/RecordsFetcherFactory.java | 39 +++++ .../lib/worker/ShardConsumer.java | 14 +- .../worker/SimpleRecordsFetcherFactory.java | 63 ++++++++ .../clientlibrary/lib/worker/Worker.java | 31 ++-- .../lib/worker/ConsumerStatesTest.java | 39 ++++- .../lib/worker/ProcessTaskTest.java | 23 +-- .../lib/worker/RecordsFetcherFactoryTest.java | 41 +++++ .../lib/worker/ShardConsumerTest.java | 19 ++- .../clientlibrary/lib/worker/WorkerTest.java | 62 ++++++-- 12 files changed, 436 insertions(+), 61 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d3ccb911..84e234b9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; - /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index c970daa0..ebc4b559 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + @Getter + private RecordsFetcherFactory recordsFetcherFactory; + /** * Constructor. * @@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + RecordsFetcherFactory recordsFetcherFactory) { + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsRegionNameValid(regionName); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamName; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; + this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis; } @@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration { return this; } + /** + * + * @param maxCacheSize the max number of records stored in the getRecordsCache + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { + checkIsValuePositive("maxCacheSize", maxCacheSize); + recordsFetcherFactory.setMaxSize(maxCacheSize); + return this; + } + + public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { + checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); + recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + return this; + } + + public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); + return this; + } + + public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { + checkIsValuePositive("maxRecordsCount", maxRecordsCount); + recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 90ac2c09..9dac442c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -108,9 +108,9 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -161,7 +161,7 @@ class ProcessTask implements ITask { final GetRecordsResult getRecordsResult = getRecordsResult(); throttlingReporter.success(); List records = getRecordsResult.getRecords(); - + if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { @@ -205,7 +205,7 @@ class ProcessTask implements ITask { /** * Dispatches a batch of records to the record processor, and handles any fallout from that. - * + * * @param getRecordsResult * the result of the last call to Kinesis * @param records @@ -233,7 +233,7 @@ class ProcessTask implements ITask { /** * Whether we should call process records or not - * + * * @param records * the records returned from the call to Kinesis, and/or deaggregation * @return true if the set of records should be dispatched to the record process, false if they should not. @@ -244,7 +244,7 @@ class ProcessTask implements ITask { /** * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation - * + * * @param records * the records to deaggregate is deaggregation is required. * @return returns either the deaggregated records, or the original records @@ -267,7 +267,7 @@ class ProcessTask implements ITask { /** * Emits metrics, and sleeps if there are no records available - * + * * @param startTimeMillis * the time when the task started */ @@ -304,8 +304,8 @@ class ProcessTask implements ITask { * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue, - final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { @@ -393,4 +393,4 @@ class ProcessTask implements ITask { return getRecordsResult; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..cdd80e49 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. + * Clients may choose to create separate instantiations, or re-use instantiations. + */ + +public interface RecordsFetcherFactory { + + /** + * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * + * @return Returns a record fetcher object + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + + void setMaxSize(int maxSize); + + void setMaxByteSize(int maxByteSize); + + void setMaxRecordsCount(int maxRecordsCount); + + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 4bbe1939..f4be0fc5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -43,6 +43,7 @@ class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; private final ShardInfo shardInfo; @@ -59,10 +60,10 @@ class ShardConsumer { private ITask currentTask; private long currentTaskSubmitTime; private Future future; - @Getter private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, @@ -91,6 +92,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -102,6 +104,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -109,9 +112,9 @@ class ShardConsumer { IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** @@ -119,6 +122,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -132,6 +136,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -143,6 +148,7 @@ class ShardConsumer { Optional maxGetRecordsThreadPool) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; + this.config = config; this.executorService = executorService; this.shardInfo = shardInfo; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..2ad61f16 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Setter; +import lombok.extern.apachecommons.CommonsLog; + +import java.util.concurrent.Executors; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxSize = 10; + private int maxByteSize = 15 * 1024 * 1024; + private int maxRecordsCount = 30000; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + } else { + return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + } + } + + @Override + public void setMaxSize(int maxSize){ + this.maxSize = maxSize; + } + + @Override + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + @Override + public void setMaxRecordsCount(int maxRecordsCount) { + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..494d1c50 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -73,6 +73,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; + private final KinesisClientLibConfiguration config; private final StreamConfig streamConfig; private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; @@ -245,6 +246,7 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + config, new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -306,6 +308,8 @@ public class Worker implements Runnable { * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @paran config + * Kinesis Library configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -333,24 +337,25 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); } - /** * @param applicationName * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Library Configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -382,7 +387,7 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, @@ -391,6 +396,7 @@ public class Worker implements Runnable { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; + this.config = config; this.streamConfig = streamConfig; this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; @@ -411,7 +417,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } - /** * @return the applicationName */ @@ -819,11 +824,11 @@ public class Worker implements Runnable { * * @param shardInfo * Kinesis shard info - * @param factory + * @param processorFactory * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { + ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -832,17 +837,17 @@ public class Worker implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - consumer = buildConsumer(shardInfo, factory); + consumer = buildConsumer(shardInfo, processorFactory); shardInfoShardConsumerMap.put(shardInfo, consumer); wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - IRecordProcessor recordProcessor = factory.createProcessor(); + protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, + return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); @@ -1049,6 +1054,7 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; private AmazonKinesis kinesisClient; private AmazonDynamoDB dynamoDBClient; @@ -1244,6 +1250,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, + config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 63f20a72..77c40cc9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -55,6 +55,8 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock + private KinesisClientLibConfiguration config; + @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -207,6 +209,33 @@ public class ConsumerStatesTest { } + @Test + public void processingStateRecordsFetcher() { + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -313,7 +342,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -323,17 +352,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -346,7 +375,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index b24bf3ec..e55336d9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -18,8 +18,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -78,6 +76,10 @@ public class ProcessTaskTest { private ThrottlingReporter throttlingReporter; @Mock private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; + @Mock + private RecordsFetcherFactory mockRecordsFetcherFactory; + @Mock + private GetRecordsCache mockRecordsFetcher; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -94,8 +96,9 @@ public class ProcessTaskTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); + when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); } @@ -103,13 +106,13 @@ public class ProcessTaskTest { public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) - .getRecords(maxRecords); + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher) + .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -117,10 +120,10 @@ public class ProcessTaskTest { @Test public void testProcessTaskWithNonExistentStream() { // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); + doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult(); TaskResult result = processTask.call(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +307,14 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( + when(mockRecordsFetcher.getNextResult()).thenReturn( new GetRecordsResult().withRecords(records)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); + verify(mockRecordsFetcher).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java new file mode 100644 index 00000000..17a77123 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -0,0 +1,41 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +public class RecordsFetcherFactoryTest { + + private RecordsFetcherFactory recordsFetcherFactory; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + } + + @Test + public void createDefaultRecordsFetcherTest() { + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); + } + + @Test + public void createPrefetchRecordsFetcherTest() { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index a3f786a6..89c40121 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -47,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -99,6 +101,10 @@ public class ShardConsumerTest { @Mock private IRecordProcessor processor; @Mock + private KinesisClientLibConfiguration config; + @Mock + private RecordsFetcherFactory recordsFetcherFactory; + @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -106,7 +112,6 @@ public class ShardConsumerTest { private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -131,6 +136,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -179,6 +185,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -207,6 +214,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -220,6 +228,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -299,7 +308,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -315,6 +324,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -401,7 +411,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -418,6 +428,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -480,6 +491,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -491,6 +503,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..9f5bcbee 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -133,6 +133,8 @@ public class WorkerTest { @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @Mock + private KinesisClientLibConfiguration config; + @Mock private ILeaseManager leaseManager; @Mock private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; @@ -210,6 +212,8 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -228,7 +232,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -257,6 +263,8 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -275,7 +283,7 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); @@ -314,6 +322,8 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -332,7 +342,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -371,6 +383,8 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -386,6 +400,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -709,6 +724,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -742,7 +759,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -785,6 +802,8 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -816,7 +835,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -850,6 +869,8 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -888,7 +909,7 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -926,6 +947,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -950,7 +973,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -988,6 +1011,8 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1020,7 +1045,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1089,6 +1114,8 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1121,7 +1148,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1195,6 +1222,8 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1226,7 +1255,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1267,6 +1296,8 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1298,7 +1329,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1336,13 +1367,14 @@ public class WorkerTest { private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); @@ -1649,10 +1681,12 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,