diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index 5220c6ae..f94e819b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -50,10 +50,13 @@ public class BlockingGetRecordsCache implements GetRecordsCache { return processRecordsInput; } + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + @Override public void shutdown() { - // - // Nothing to do here. - // + getRecordsRetrievalStrategy.shutdown(); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d3ccb911..9121df4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; - /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. @@ -253,9 +251,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); + return new InitializeTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.getStreamConfig(), + consumer.getGetRecordsCache()); } @Override @@ -309,10 +312,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getGetRecordsRetrievalStrategy()); + return new ProcessTask(consumer.getShardInfo(), + consumer.getStreamConfig(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getGetRecordsCache()); } @Override @@ -371,8 +378,10 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), - consumer.getShutdownNotification(), consumer.getShardInfo()); + return new ShutdownNotificationTask(consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification(), + consumer.getShardInfo()); } @Override @@ -511,13 +520,16 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), + return new ShutdownTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownReason(), consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), - consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), + consumer.isCleanupLeasesOfCompletedShards(), + consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsRetrievalStrategy()); + consumer.getGetRecordsCache()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index d08ec285..dba24f8d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -33,6 +33,8 @@ public interface GetRecordsCache { * @return The next set of records. */ ProcessRecordsInput getNextResult(); + + GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy(); /** * This method calls the shutdown behavior on the cache, if available. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index e3d9f607..5e847a89 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -43,17 +43,19 @@ class InitializeTask implements ITask { // Back off for this interval if we encounter a problem (exception) private final long backoffTimeMillis; private final StreamConfig streamConfig; + private final GetRecordsCache getRecordsCache; /** * Constructor. */ InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig) { + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; @@ -61,6 +63,7 @@ class InitializeTask implements ITask { this.dataFetcher = dataFetcher; this.backoffTimeMillis = backoffTimeMillis; this.streamConfig = streamConfig; + this.getRecordsCache = getRecordsCache; } /* @@ -80,6 +83,7 @@ class InitializeTask implements ITask { ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); + getRecordsCache.start(); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index c970daa0..ebc4b559 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + @Getter + private RecordsFetcherFactory recordsFetcherFactory; + /** * Constructor. * @@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + RecordsFetcherFactory recordsFetcherFactory) { + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsRegionNameValid(regionName); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamName; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; + this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis; } @@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration { return this; } + /** + * + * @param maxCacheSize the max number of records stored in the getRecordsCache + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { + checkIsValuePositive("maxCacheSize", maxCacheSize); + recordsFetcherFactory.setMaxSize(maxCacheSize); + return this; + } + + public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { + checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); + recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + return this; + } + + public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); + return this; + } + + public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { + checkIsValuePositive("maxRecordsCount", maxRecordsCount); + recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 8779b5da..5e84b1cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 979c5b1a..8315fb73 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -63,6 +63,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void start() { + if (executorService.isShutdown()) { + throw new IllegalStateException("ExecutorService has been shutdown."); + } + if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); @@ -72,8 +76,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public ProcessRecordsInput getNextResult() { + if (executorService.isShutdown()) { + throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests."); + } + if (!started) { - throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); + throw new IllegalStateException("Cache has not been initialized, make sure to call start."); } ProcessRecordsInput result = null; try { @@ -85,9 +93,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { return result; } + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + @Override public void shutdown() { + getRecordsRetrievalStrategy.shutdown(); executorService.shutdownNow(); + started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 90ac2c09..20be71b4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -18,7 +18,6 @@ import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.ListIterator; -import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +62,7 @@ class ProcessTask implements ITask { private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * @param shardInfo @@ -78,17 +77,17 @@ class ProcessTask implements ITask { * Kinesis data fetcher (used to fetch records from Kinesis) * @param backoffTimeMillis * backoff time when catching exceptions - * @param getRecordsRetrievalStrategy + * @param getRecordsCache * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - getRecordsRetrievalStrategy); + getRecordsCache); } /** @@ -108,9 +107,9 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -120,7 +119,7 @@ class ProcessTask implements ITask { this.backoffTimeMillis = backoffTimeMillis; this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -158,10 +157,10 @@ class ProcessTask implements ITask { return new TaskResult(null, true); } - final GetRecordsResult getRecordsResult = getRecordsResult(); + final ProcessRecordsInput processRecordsInput = getRecordsResult(); throttlingReporter.success(); - List records = getRecordsResult.getRecords(); - + List records = processRecordsInput.getRecords(); + if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { @@ -175,7 +174,7 @@ class ProcessTask implements ITask { recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { - callProcessRecords(getRecordsResult, records); + callProcessRecords(processRecordsInput, records); } } catch (ProvisionedThroughputExceededException pte) { throttlingReporter.throttled(); @@ -205,18 +204,18 @@ class ProcessTask implements ITask { /** * Dispatches a batch of records to the record processor, and handles any fallout from that. - * - * @param getRecordsResult + * + * @param input * the result of the last call to Kinesis * @param records * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. */ - private void callProcessRecords(GetRecordsResult getRecordsResult, List records) { + private void callProcessRecords(ProcessRecordsInput input, List records) { LOG.debug("Calling application processRecords() with " + records.size() + " records from " + shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + .withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { @@ -233,7 +232,7 @@ class ProcessTask implements ITask { /** * Whether we should call process records or not - * + * * @param records * the records returned from the call to Kinesis, and/or deaggregation * @return true if the set of records should be dispatched to the record process, false if they should not. @@ -244,7 +243,7 @@ class ProcessTask implements ITask { /** * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation - * + * * @param records * the records to deaggregate is deaggregation is required. * @return returns either the deaggregated records, or the original records @@ -267,7 +266,7 @@ class ProcessTask implements ITask { /** * Emits metrics, and sleeps if there are no records available - * + * * @param startTimeMillis * the time when the task started */ @@ -304,8 +303,8 @@ class ProcessTask implements ITask { * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue, - final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { @@ -339,7 +338,7 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResult() { + private ProcessRecordsInput getRecordsResult() { try { return getRecordsResultAndRecordMillisBehindLatest(); } catch (ExpiredIteratorException e) { @@ -375,22 +374,17 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { - final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); + private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() { + final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); - if (getRecordsResult == null) { - // Stream no longer exists - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - - if (getRecordsResult.getMillisBehindLatest() != null) { + if (processRecordsInput.getMillisBehindLatest() != null) { MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, - getRecordsResult.getMillisBehindLatest(), + processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY); } - return getRecordsResult; + return processRecordsInput; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..cdd80e49 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. + * Clients may choose to create separate instantiations, or re-use instantiations. + */ + +public interface RecordsFetcherFactory { + + /** + * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * + * @return Returns a record fetcher object + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + + void setMaxSize(int maxSize); + + void setMaxByteSize(int maxByteSize); + + void setMaxRecordsCount(int maxRecordsCount); + + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 4bbe1939..75b5f474 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -43,6 +43,7 @@ class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; private final ShardInfo shardInfo; @@ -61,7 +62,7 @@ class ShardConsumer { private Future future; @Getter - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, @@ -91,6 +92,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -99,19 +101,20 @@ class ShardConsumer { */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config) { + this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), config); } /** @@ -126,6 +129,7 @@ class ShardConsumer { * @param backoffTimeMillis backoff interval when we encounter exceptions * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. + * @param config Kinesis library configuration */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, @@ -140,26 +144,85 @@ class ShardConsumer { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool) { - this.streamConfig = streamConfig; - this.recordProcessor = recordProcessor; - this.executorService = executorService; - this.shardInfo = shardInfo; - this.checkpoint = checkpoint; - this.recordProcessorCheckpointer = - new RecordProcessorCheckpointer(shardInfo, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + + this( + shardInfo, + streamConfig, + checkpoint, + recordProcessor, + new RecordProcessorCheckpointer( + shardInfo, checkpoint, - new SequenceNumberValidator(streamConfig.getStreamProxy(), + new SequenceNumberValidator( + streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); - this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config + ); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseManager Used to create leases for new shards + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + */ + ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + this.shardInfo = shardInfo; + this.streamConfig = streamConfig; + this.checkpoint = checkpoint; + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.leaseManager = leaseManager; - this.metricsFactory = metricsFactory; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.executorService = executorService; + this.metricsFactory = metricsFactory; this.taskBackoffTimeMillis = backoffTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); + this.config = config; + this.dataFetcher = kinesisDataFetcher; + this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo)); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index f56033a8..bd40d686 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -46,7 +46,7 @@ class ShutdownTask implements ITask { private final boolean cleanupLeasesOfCompletedShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * Constructor. @@ -61,7 +61,7 @@ class ShutdownTask implements ITask { boolean cleanupLeasesOfCompletedShards, ILeaseManager leaseManager, long backoffTimeMillis, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -71,7 +71,7 @@ class ShutdownTask implements ITask { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; } /* @@ -111,7 +111,7 @@ class ShutdownTask implements ITask { } } LOG.debug("Shutting down retrieval strategy."); - getRecordsRetrievalStrategy.shutdown(); + getRecordsCache.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); } catch (Exception e) { applicationException = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..4ddf6112 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import java.util.concurrent.Executors; + +import lombok.extern.apachecommons.CommonsLog; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxSize = 10; + private int maxByteSize = 15 * 1024 * 1024; + private int maxRecordsCount = 30000; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + } else { + return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + } + } + + @Override + public void setMaxSize(int maxSize){ + this.maxSize = maxSize; + } + + @Override + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + @Override + public void setMaxRecordsCount(int maxRecordsCount) { + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..d2ea738d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -73,6 +73,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; + private final KinesisClientLibConfiguration config; private final StreamConfig streamConfig; private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; @@ -245,6 +246,7 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + config, new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -306,6 +308,8 @@ public class Worker implements Runnable { * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @paran config + * Kinesis Library configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -333,24 +337,25 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); } - /** * @param applicationName * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Library Configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -382,7 +387,7 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, @@ -391,6 +396,7 @@ public class Worker implements Runnable { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; + this.config = config; this.streamConfig = streamConfig; this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; @@ -411,7 +417,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } - /** * @return the applicationName */ @@ -819,11 +824,11 @@ public class Worker implements Runnable { * * @param shardInfo * Kinesis shard info - * @param factory + * @param processorFactory * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { + ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -832,20 +837,30 @@ public class Worker implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - consumer = buildConsumer(shardInfo, factory); + consumer = buildConsumer(shardInfo, processorFactory); shardInfoShardConsumerMap.put(shardInfo, consumer); wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - IRecordProcessor recordProcessor = factory.createProcessor(); + protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, - leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); + return new ShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + leaseCoordinator.getLeaseManager(), + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config); } @@ -1049,6 +1064,7 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; private AmazonKinesis kinesisClient; private AmazonDynamoDB dynamoDBClient; @@ -1244,6 +1260,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, + config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 8518c992..a120a480 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -110,7 +110,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Test @Ignore public void testInterrupted() throws InterruptedException, ExecutionException { - Future mockFuture = mock(Future.class); when(completionService.submit(any())).thenReturn(mockFuture); when(completionService.poll()).thenReturn(mockFuture); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 63f20a72..fa163ad2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -55,6 +55,8 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock + private KinesisClientLibConfiguration config; + @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -75,7 +77,7 @@ public class ConsumerStatesTest { @Mock private InitialPositionInStreamExtended initialPositionInStream; @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -98,7 +100,7 @@ public class ConsumerStatesTest { when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getShutdownReason()).thenReturn(reason); - when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); + when(consumer.getGetRecordsCache()).thenReturn(getRecordsCache); } private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; @@ -207,6 +209,33 @@ public class ConsumerStatesTest { } + @Test + public void processingStateRecordsFetcher() { + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -313,7 +342,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -323,17 +352,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -346,7 +375,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index eeb8ff1d..8517138f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -182,6 +182,12 @@ public class PrefetchGetRecordsCacheTest { verify(executorService, times(0)).execute(any()); getRecordsCache.getNextResult(); } + + @Test(expected = IllegalStateException.class) + public void testCallAfterShutdown() { + when(executorService.isShutdown()).thenReturn(true); + getRecordsCache.getNextResult(); + } @After public void shutdown() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index b24bf3ec..94d0918e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -18,8 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -49,7 +48,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.google.protobuf.ByteString; @@ -77,7 +75,7 @@ public class ProcessTaskTest { @Mock private ThrottlingReporter throttlingReporter; @Mock - private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -95,32 +93,39 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); + shardInfo, + config, + mockRecordProcessor, + mockCheckpointer, + mockDataFetcher, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + throttlingReporter, + getRecordsCache); } @Test public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) - .getRecords(maxRecords); + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache) + .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(getRecordsCache).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @Test public void testProcessTaskWithNonExistentStream() { - // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); + // Data fetcher returns a null Result ` the stream does not exist + doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult(); TaskResult result = processTask.call(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(getRecordsCache).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +309,13 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( - new GetRecordsResult().withRecords(records)); + when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); + verify(getRecordsCache).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java new file mode 100644 index 00000000..17a77123 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -0,0 +1,41 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +public class RecordsFetcherFactoryTest { + + private RecordsFetcherFactory recordsFetcherFactory; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + } + + @Test + public void createDefaultRecordsFetcherTest() { + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); + } + + @Test + public void createPrefetchRecordsFetcherTest() { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index a3f786a6..4414b96a 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -20,9 +20,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -47,11 +48,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -95,10 +98,16 @@ public class ShardConsumerTest { // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); - + private final int maxRecords = 500; + private RecordsFetcherFactory recordsFetcherFactory; + + private GetRecordsCache getRecordsCache; + @Mock private IRecordProcessor processor; @Mock + private KinesisClientLibConfiguration config; + @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -107,6 +116,14 @@ public class ShardConsumerTest { @Mock private ShutdownNotification shutdownNotification; + @Before + public void setup() { + getRecordsCache = null; + + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + } + /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -137,8 +154,9 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -154,7 +172,6 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -185,7 +202,8 @@ public class ShardConsumerTest { spyExecutorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -226,7 +244,8 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; @@ -299,7 +318,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -310,18 +328,39 @@ public class ShardConsumerTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -330,6 +369,7 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -342,6 +382,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(5)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.notifyShutdownRequested(shutdownNotification); @@ -365,6 +407,8 @@ public class ShardConsumerTest { verify(shutdownNotification, atLeastOnce()).shutdownComplete(); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + + verify(getRecordsCache).shutdown(); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); @@ -401,7 +445,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -413,18 +456,39 @@ public class ShardConsumerTest { atTimestamp); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -433,6 +497,8 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); + + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -445,6 +511,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(4)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); @@ -457,8 +525,11 @@ public class ShardConsumerTest { executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); + verify(getRecordsCache).shutdown(); + String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); assertEquals(4, processor.getProcessedRecords().size()); file.delete(); @@ -486,11 +557,15 @@ public class ShardConsumerTest { executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + + GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache()); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); @@ -535,9 +610,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.empty(), - Optional.empty()); + Optional.empty(), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + SynchronousGetRecordsRetrievalStrategy.class); } @Test @@ -563,9 +640,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.of(1), - Optional.of(2)); + Optional.of(2), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + AsynchronousGetRecordsRetrievalStrategy.class); } //@formatter:off (gets the formatting wrong) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 5d91c698..17a53137 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -59,7 +59,7 @@ public class ShutdownTaskTest { IRecordProcessor defaultRecordProcessor = new TestStreamlet(); @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; /** * @throws java.lang.Exception @@ -80,7 +80,7 @@ public class ShutdownTaskTest { */ @Before public void setUp() throws Exception { - doNothing().when(getRecordsRetrievalStrategy).shutdown(); + doNothing().when(getRecordsCache).shutdown(); } /** @@ -109,7 +109,7 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -135,11 +135,11 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); - verify(getRecordsRetrievalStrategy).shutdown(); + verify(getRecordsCache).shutdown(); } /** @@ -147,7 +147,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..107900dc 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -60,6 +60,7 @@ import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Matchers; @@ -129,10 +130,14 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; + + private RecordsFetcherFactory recordsFetcherFactory; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @Mock + private KinesisClientLibConfiguration config; + @Mock private ILeaseManager leaseManager; @Mock private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; @@ -154,6 +159,12 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; + + @Before + public void setup() { + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -210,6 +221,8 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -228,7 +241,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -257,6 +272,8 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -275,7 +292,7 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); @@ -314,6 +331,8 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -332,7 +351,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -371,6 +392,8 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -386,6 +409,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -709,6 +733,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -742,7 +768,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -785,6 +811,8 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -816,7 +844,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -850,6 +878,8 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -888,7 +918,7 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -926,6 +956,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -950,7 +982,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -988,6 +1020,8 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1020,7 +1054,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1089,6 +1123,8 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1121,7 +1157,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1195,6 +1231,8 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1226,7 +1264,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1267,6 +1305,8 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1298,7 +1338,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1336,13 +1376,14 @@ public class WorkerTest { private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); @@ -1649,10 +1690,12 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,