diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 3a8282e4..a7ac9a5a 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.3 +Bundle-Version: 1.8.4 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/README.md b/README.md index ddaa6194..00ed3caa 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.4 (September 22, 2017) +* Create a new completion service for each request. + This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records. + * [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227) + * [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226) + ### Release 1.8.3 (September 22, 2017) * Call shutdown on the retriever when the record processor is being shutdown This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used. diff --git a/pom.xml b/pom.xml index 61c8c6cf..66006321 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.3 + 1.8.5-SNAPSHOT The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index 92057327..b592c29b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; @@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie private final ExecutorService executorService; private final int retryGetRecordsInSeconds; private final String shardId; - final CompletionService completionService; + final Supplier> completionServiceSupplier; public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { @@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { - this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService), + this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService), shardId); } AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, - int retryGetRecordsInSeconds, CompletionService completionService, String shardId) { + int retryGetRecordsInSeconds, Supplier> completionServiceSupplier, + String shardId) { this.dataFetcher = dataFetcher; this.executorService = executorService; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; - this.completionService = completionService; + this.completionServiceSupplier = completionServiceSupplier; this.shardId = shardId; } @@ -75,6 +78,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie throw new IllegalStateException("Strategy has been shutdown"); } GetRecordsResult result = null; + CompletionService completionService = completionServiceSupplier.get(); Set> futures = new HashSet<>(); Callable retrieverCall = createRetrieverCallable(maxRecords); while (true) { @@ -98,13 +102,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie break; } } - futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> { - try { - completionService.take(); - } catch (InterruptedException e) { - log.error("Exception thrown while trying to empty the threadpool."); - } - }); + futures.forEach(f -> f.cancel(true)); return result; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index 5220c6ae..f94e819b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -50,10 +50,13 @@ public class BlockingGetRecordsCache implements GetRecordsCache { return processRecordsInput; } + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + @Override public void shutdown() { - // - // Nothing to do here. - // + getRecordsRetrievalStrategy.shutdown(); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index 84e234b9..9121df4b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -251,9 +251,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); + return new InitializeTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getCheckpoint(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.getStreamConfig(), + consumer.getGetRecordsCache()); } @Override @@ -307,10 +312,14 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getGetRecordsRetrievalStrategy()); + return new ProcessTask(consumer.getShardInfo(), + consumer.getStreamConfig(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getDataFetcher(), + consumer.getTaskBackoffTimeMillis(), + consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), + consumer.getGetRecordsCache()); } @Override @@ -369,8 +378,10 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), - consumer.getShutdownNotification(), consumer.getShardInfo()); + return new ShutdownNotificationTask(consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownNotification(), + consumer.getShardInfo()); } @Override @@ -509,13 +520,16 @@ class ConsumerStates { @Override public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), - consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), + return new ShutdownTask(consumer.getShardInfo(), + consumer.getRecordProcessor(), + consumer.getRecordProcessorCheckpointer(), + consumer.getShutdownReason(), consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), - consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), + consumer.isCleanupLeasesOfCompletedShards(), + consumer.getLeaseManager(), consumer.getTaskBackoffTimeMillis(), - consumer.getGetRecordsRetrievalStrategy()); + consumer.getGetRecordsCache()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java index d08ec285..dba24f8d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsCache.java @@ -33,6 +33,8 @@ public interface GetRecordsCache { * @return The next set of records. */ ProcessRecordsInput getNextResult(); + + GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy(); /** * This method calls the shutdown behavior on the cache, if available. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index e3d9f607..5e847a89 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -43,17 +43,19 @@ class InitializeTask implements ITask { // Back off for this interval if we encounter a problem (exception) private final long backoffTimeMillis; private final StreamConfig streamConfig; + private final GetRecordsCache getRecordsCache; /** * Constructor. */ InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig) { + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; @@ -61,6 +63,7 @@ class InitializeTask implements ITask { this.dataFetcher = dataFetcher; this.backoffTimeMillis = backoffTimeMillis; this.streamConfig = streamConfig; + this.getRecordsCache = getRecordsCache; } /* @@ -80,6 +83,7 @@ class InitializeTask implements ITask { ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); + getRecordsCache.start(); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index ebc4b559..ca7884d9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls @@ -182,6 +182,11 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; + /** + * The amount of time to sleep in between 2 get calls from the data fetcher. + */ + public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L; + private String applicationName; private String tableName; private String streamName; @@ -234,6 +239,9 @@ public class KinesisClientLibConfiguration { @Getter private RecordsFetcherFactory recordsFetcherFactory; + + @Getter + private long idleMillisBetweenCalls; /** * Constructor. @@ -270,15 +278,32 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, - dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, - DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, - DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), - DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, - DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, - DEFAULT_SHUTDOWN_GRACE_MILLIS); + this(applicationName, + streamName, + null, + null, + DEFAULT_INITIAL_POSITION_IN_STREAM, + kinesisCredentialsProvider, + dynamoDBCredentialsProvider, + cloudWatchCredentialsProvider, + DEFAULT_FAILOVER_TIME_MILLIS, + workerId, + DEFAULT_MAX_RECORDS, + DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, + DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, + DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + DEFAULT_TASK_BACKOFF_TIME_MILLIS, + DEFAULT_METRICS_BUFFER_TIME_MILLIS, + DEFAULT_METRICS_MAX_QUEUE_SIZE, + DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + null, + DEFAULT_SHUTDOWN_GRACE_MILLIS, + DEFAULT_IDLE_MILLIS_BETWEEN_CALLS); } /** @@ -318,29 +343,30 @@ public class KinesisClientLibConfiguration { // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES public KinesisClientLibConfiguration(String applicationName, - String streamName, - String kinesisEndpoint, - InitialPositionInStream initialPositionInStream, - AWSCredentialsProvider kinesisCredentialsProvider, - AWSCredentialsProvider dynamoDBCredentialsProvider, - AWSCredentialsProvider cloudWatchCredentialsProvider, - long failoverTimeMillis, - String workerId, - int maxRecords, - long idleTimeBetweenReadsInMillis, - boolean callProcessRecordsEvenForEmptyRecordList, - long parentShardPollIntervalMillis, - long shardSyncIntervalMillis, - boolean cleanupTerminatedShardsBeforeExpiry, - ClientConfiguration kinesisClientConfig, - ClientConfiguration dynamoDBClientConfig, - ClientConfiguration cloudWatchClientConfig, - long taskBackoffTimeMillis, - long metricsBufferTimeMillis, - int metricsMaxQueueSize, - boolean validateSequenceNumberBeforeCheckpointing, - String regionName, - long shutdownGraceMillis) { + String streamName, + String kinesisEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + long idleMillisBetweenCalls) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -348,7 +374,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls); } /** @@ -388,30 +414,31 @@ public class KinesisClientLibConfiguration { // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES public KinesisClientLibConfiguration(String applicationName, - String streamName, - String kinesisEndpoint, - String dynamoDBEndpoint, - InitialPositionInStream initialPositionInStream, - AWSCredentialsProvider kinesisCredentialsProvider, - AWSCredentialsProvider dynamoDBCredentialsProvider, - AWSCredentialsProvider cloudWatchCredentialsProvider, - long failoverTimeMillis, - String workerId, - int maxRecords, - long idleTimeBetweenReadsInMillis, - boolean callProcessRecordsEvenForEmptyRecordList, - long parentShardPollIntervalMillis, - long shardSyncIntervalMillis, - boolean cleanupTerminatedShardsBeforeExpiry, - ClientConfiguration kinesisClientConfig, - ClientConfiguration dynamoDBClientConfig, - ClientConfiguration cloudWatchClientConfig, - long taskBackoffTimeMillis, - long metricsBufferTimeMillis, - int metricsMaxQueueSize, - boolean validateSequenceNumberBeforeCheckpointing, - String regionName, - long shutdownGraceMillis) { + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + long idleMillisBetweenCalls) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -422,6 +449,7 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); + checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -459,6 +487,7 @@ public class KinesisClientLibConfiguration { this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + this.idleMillisBetweenCalls = idleMillisBetweenCalls; } /** @@ -1290,7 +1319,13 @@ public class KinesisClientLibConfiguration { } public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { - recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); + switch (dataFetchingStrategy.toUpperCase()) { + case "PREFETCH_CACHED": + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + break; + default: + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.DEFAULT); + } return this; } @@ -1316,4 +1351,14 @@ public class KinesisClientLibConfiguration { this.shutdownGraceMillis = shutdownGraceMillis; return this; } + + /** + * @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher. + * @return + */ + public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { + checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index 8779b5da..a8e05d86 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -1,19 +1,21 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.Date; @@ -40,16 +42,18 @@ class KinesisDataFetcher { private final String shardId; private boolean isShardEndReached; private boolean isInitialized; + private Instant lastResponseTime; + private long idleMillisBetweenCalls; /** * * @param kinesisProxy Kinesis proxy * @param shardInfo The shardInfo object. */ - public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { + public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) { this.shardId = shardInfo.getShardId(); - this.kinesisProxy = - new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); + this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); + this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls(); } /** @@ -66,7 +70,9 @@ class KinesisDataFetcher { GetRecordsResult response = null; if (nextIterator != null) { try { + sleepBeforeNextCall(); response = kinesisProxy.get(nextIterator, maxRecords); + lastResponseTime = Instant.now(); nextIterator = response.getNextShardIterator(); } catch (ResourceNotFoundException e) { LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); @@ -182,6 +188,19 @@ class KinesisDataFetcher { } return iterator; } + + protected void sleepBeforeNextCall() { + if (lastResponseTime != null) { + long timeDiff = Duration.between(lastResponseTime, Instant.now()).abs().toMillis(); + if (timeDiff < idleMillisBetweenCalls) { + try { + Thread.sleep(idleMillisBetweenCalls - timeDiff); + } catch (InterruptedException e) { + LOG.info("Thread interrupted, shutdown possibly called."); + } + } + } + } /** * @return the shardEndReached diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 979c5b1a..c849f7f2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -63,6 +63,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void start() { + if (executorService.isShutdown()) { + throw new IllegalStateException("ExecutorService has been shutdown."); + } + if (!started) { log.info("Starting prefetching thread."); executorService.execute(new DefaultGetRecordsCacheDaemon()); @@ -72,8 +76,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public ProcessRecordsInput getNextResult() { + if (executorService.isShutdown()) { + throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests."); + } + if (!started) { - throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); + throw new IllegalStateException("Cache has not been initialized, make sure to call start."); } ProcessRecordsInput result = null; try { @@ -85,9 +93,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { return result; } + @Override + public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { + return getRecordsRetrievalStrategy; + } + @Override public void shutdown() { + getRecordsRetrievalStrategy.shutdown(); executorService.shutdownNow(); + started = false; } private class DefaultGetRecordsCacheDaemon implements Runnable { @@ -108,7 +123,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache", e); + log.info("Thread was interrupted, indicating shutdown was called on the cache"); } } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 9dac442c..20be71b4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -18,7 +18,6 @@ import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.ListIterator; -import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +62,7 @@ class ProcessTask implements ITask { private final Shard shard; private final ThrottlingReporter throttlingReporter; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * @param shardInfo @@ -78,17 +77,17 @@ class ProcessTask implements ITask { * Kinesis data fetcher (used to fetch records from Kinesis) * @param backoffTimeMillis * backoff time when catching exceptions - * @param getRecordsRetrievalStrategy + * @param getRecordsCache * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - getRecordsRetrievalStrategy); + getRecordsCache); } /** @@ -110,7 +109,7 @@ class ProcessTask implements ITask { public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -120,7 +119,7 @@ class ProcessTask implements ITask { this.backoffTimeMillis = backoffTimeMillis; this.throttlingReporter = throttlingReporter; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if @@ -158,9 +157,9 @@ class ProcessTask implements ITask { return new TaskResult(null, true); } - final GetRecordsResult getRecordsResult = getRecordsResult(); + final ProcessRecordsInput processRecordsInput = getRecordsResult(); throttlingReporter.success(); - List records = getRecordsResult.getRecords(); + List records = processRecordsInput.getRecords(); if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); @@ -175,7 +174,7 @@ class ProcessTask implements ITask { recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { - callProcessRecords(getRecordsResult, records); + callProcessRecords(processRecordsInput, records); } } catch (ProvisionedThroughputExceededException pte) { throttlingReporter.throttled(); @@ -206,17 +205,17 @@ class ProcessTask implements ITask { /** * Dispatches a batch of records to the record processor, and handles any fallout from that. * - * @param getRecordsResult + * @param input * the result of the last call to Kinesis * @param records * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. */ - private void callProcessRecords(GetRecordsResult getRecordsResult, List records) { + private void callProcessRecords(ProcessRecordsInput input, List records) { LOG.debug("Calling application processRecords() with " + records.size() + " records from " + shardInfo.getShardId()); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) .withCheckpointer(recordProcessorCheckpointer) - .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); + .withMillisBehindLatest(input.getMillisBehindLatest()); final long recordProcessorStartTimeMillis = System.currentTimeMillis(); try { @@ -339,7 +338,7 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResult() { + private ProcessRecordsInput getRecordsResult() { try { return getRecordsResultAndRecordMillisBehindLatest(); } catch (ExpiredIteratorException e) { @@ -375,22 +374,17 @@ class ProcessTask implements ITask { * * @return list of data records from Kinesis */ - private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { - final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); + private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() { + final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); - if (getRecordsResult == null) { - // Stream no longer exists - return new GetRecordsResult().withRecords(Collections.emptyList()); - } - - if (getRecordsResult.getMillisBehindLatest() != null) { + if (processRecordsInput.getMillisBehindLatest() != null) { MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, - getRecordsResult.getMillisBehindLatest(), + processRecordsInput.getMillisBehindLatest(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY); } - return getRecordsResult; + return processRecordsInput; } } \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index f4be0fc5..de49dada 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -60,9 +60,9 @@ class ShardConsumer { private ITask currentTask; private long currentTaskSubmitTime; private Future future; + @Getter - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - + private final GetRecordsCache getRecordsCache; private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, @@ -101,20 +101,31 @@ class ShardConsumer { */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibConfiguration config, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager, - parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, - backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config) { + this(shardInfo, + streamConfig, + checkpoint, + recordProcessor, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional.empty(), + Optional.empty(), + config); } /** @@ -122,7 +133,6 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard - * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -130,13 +140,13 @@ class ShardConsumer { * @param backoffTimeMillis backoff interval when we encounter exceptions * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. + * @param config Kinesis library configuration */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, - KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -145,27 +155,85 @@ class ShardConsumer { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool) { - this.streamConfig = streamConfig; - this.recordProcessor = recordProcessor; - this.config = config; - this.executorService = executorService; - this.shardInfo = shardInfo; - this.checkpoint = checkpoint; - this.recordProcessorCheckpointer = - new RecordProcessorCheckpointer(shardInfo, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + + this( + shardInfo, + streamConfig, + checkpoint, + recordProcessor, + new RecordProcessorCheckpointer( + shardInfo, checkpoint, - new SequenceNumberValidator(streamConfig.getStreamProxy(), + new SequenceNumberValidator( + streamConfig.getStreamProxy(), shardInfo.getShardId(), - streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); - this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())), + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + backoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config + ); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseManager Used to create leases for new shards + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + */ + ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config) { + this.shardInfo = shardInfo; + this.streamConfig = streamConfig; + this.checkpoint = checkpoint; + this.recordProcessor = recordProcessor; + this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.leaseManager = leaseManager; - this.metricsFactory = metricsFactory; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; + this.executorService = executorService; + this.metricsFactory = metricsFactory; this.taskBackoffTimeMillis = backoffTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); + this.config = config; + this.dataFetcher = kinesisDataFetcher; + this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( + makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo)); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index f56033a8..bd40d686 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -46,7 +46,7 @@ class ShutdownTask implements ITask { private final boolean cleanupLeasesOfCompletedShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; - private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final GetRecordsCache getRecordsCache; /** * Constructor. @@ -61,7 +61,7 @@ class ShutdownTask implements ITask { boolean cleanupLeasesOfCompletedShards, ILeaseManager leaseManager, long backoffTimeMillis, - GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -71,7 +71,7 @@ class ShutdownTask implements ITask { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; - this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.getRecordsCache = getRecordsCache; } /* @@ -111,7 +111,7 @@ class ShutdownTask implements ITask { } } LOG.debug("Shutting down retrieval strategy."); - getRecordsRetrievalStrategy.shutdown(); + getRecordsCache.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); } catch (Exception e) { applicationException = true; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 2ad61f16..3a3958f3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -14,16 +14,15 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import lombok.Setter; -import lombok.extern.apachecommons.CommonsLog; - import java.util.concurrent.Executors; +import lombok.extern.apachecommons.CommonsLog; + @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private final int maxRecords; - private int maxSize = 10; - private int maxByteSize = 15 * 1024 * 1024; + private int maxSize = 3; + private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 494d1c50..d2ea738d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -847,10 +847,20 @@ public class Worker implements Runnable { protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config, - leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); + return new ShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + leaseCoordinator.getLeaseManager(), + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 8518c992..051985d3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -27,12 +27,14 @@ import org.mockito.runners.MockitoJUnitRunner; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -59,6 +61,13 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Mock private ShardInfo mockShardInfo; + @Mock + private Supplier> completionServiceSupplier; + + @Mock + private KinesisClientLibConfiguration configuration; + + private CompletionService completionService; private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy; private KinesisDataFetcher dataFetcher; @@ -66,11 +75,11 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { private ExecutorService executorService; private RejectedExecutionHandler rejectedExecutionHandler; private int numberOfRecords = 10; - private CompletionService completionService; + @Before public void setup() { - dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo)); + dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo, configuration)); rejectedExecutionHandler = spy(new ThreadPoolExecutor.AbortPolicy()); executorService = spy(new ThreadPoolExecutor( CORE_POOL_SIZE, @@ -80,9 +89,12 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { new LinkedBlockingQueue<>(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(), rejectedExecutionHandler)); - getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001"); - completionService = spy(getRecordsRetrivalStrategy.completionService); + completionService = spy(new ExecutorCompletionService(executorService)); + when(completionServiceSupplier.get()).thenReturn(completionService); + getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy( + dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001"); result = null; + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); } @Test @@ -97,12 +109,16 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { public void multiRequestTest() { result = mock(GetRecordsResult.class); + ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService)); + when(completionServiceSupplier.get()).thenReturn(completionService1); GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); assertEquals(result, getRecordsResult); result = null; + ExecutorCompletionService completionService2 = spy(new ExecutorCompletionService(executorService)); + when(completionServiceSupplier.get()).thenReturn(completionService2); getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); assertNull(getRecordsResult); } @@ -110,7 +126,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Test @Ignore public void testInterrupted() throws InterruptedException, ExecutionException { - Future mockFuture = mock(Future.class); when(completionService.submit(any())).thenReturn(mockFuture); when(completionService.poll()).thenReturn(mockFuture); @@ -138,8 +153,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { } private class KinesisDataFetcherForTests extends KinesisDataFetcher { - public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) { - super(kinesisProxy, shardInfo); + public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo, + final KinesisClientLibConfiguration configuration) { + super(kinesisProxy, shardInfo, configuration); } @Override diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java index 9ecea68d..820f4a57 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java @@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -51,6 +53,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Mock private ExecutorService executorService; @Mock + private Supplier> completionServiceSupplier; + @Mock private CompletionService completionService; @Mock private Future successfulFuture; @@ -59,10 +63,15 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Mock private GetRecordsResult expectedResults; + @Before + public void before() { + when(completionServiceSupplier.get()).thenReturn(completionService); + } + @Test public void testSingleSuccessfulRequestFuture() throws Exception { AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, - executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); + executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); when(executorService.isShutdown()).thenReturn(false); when(completionService.submit(any())).thenReturn(successfulFuture); @@ -76,8 +85,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(successfulFuture).get(); verify(successfulFuture).cancel(eq(true)); - verify(successfulFuture).isCancelled(); - verify(completionService, never()).take(); assertThat(result, equalTo(expectedResults)); } @@ -85,7 +92,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Test public void testBlockedAndSuccessfulFuture() throws Exception { AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, - executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); + executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); when(executorService.isShutdown()).thenReturn(false); when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture); @@ -104,9 +111,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { verify(blockedFuture, never()).get(); verify(successfulFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true)); - verify(successfulFuture).isCancelled(); - verify(blockedFuture).isCancelled(); - verify(completionService).take(); assertThat(actualResults, equalTo(expectedResults)); } @@ -114,7 +118,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Test(expected = IllegalStateException.class) public void testStrategyIsShutdown() throws Exception { AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, - executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); + executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); when(executorService.isShutdown()).thenReturn(true); @@ -124,7 +128,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { @Test public void testPoolOutOfResources() throws Exception { AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, - executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); + executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); when(executorService.isShutdown()).thenReturn(false); when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture); @@ -141,9 +145,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(successfulFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true)); - verify(successfulFuture).isCancelled(); - verify(blockedFuture).isCancelled(); - verify(completionService).take(); + assertThat(actualResult, equalTo(expectedResults)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 77c40cc9..fa163ad2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -77,7 +77,7 @@ public class ConsumerStatesTest { @Mock private InitialPositionInStreamExtended initialPositionInStream; @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -100,7 +100,7 @@ public class ConsumerStatesTest { when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getShutdownReason()).thenReturn(reason); - when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); + when(consumer.getGetRecordsCache()).thenReturn(getRecordsCache); } private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index cfa8be10..1b67412f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -85,6 +85,7 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, + TEST_VALUE_LONG, TEST_VALUE_LONG); } @@ -95,7 +96,8 @@ public class KinesisClientLibConfigurationTest { // Try each argument at one time. KinesisClientLibConfiguration config = null; long[] longValues = - { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; + { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, + TEST_VALUE_LONG, TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -124,7 +126,8 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - longValues[6]); + longValues[6], + longValues[7]); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -159,6 +162,7 @@ public class KinesisClientLibConfigurationTest { intValues[1], skipCheckpointValidationValue, null, + TEST_VALUE_LONG, TEST_VALUE_LONG); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); @@ -300,30 +304,31 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, - TEST_STRING, - TEST_STRING, - TEST_STRING, - null, - null, - null, - null, - TEST_VALUE_LONG, - TEST_STRING, - 3, - TEST_VALUE_LONG, - false, - TEST_VALUE_LONG, - TEST_VALUE_LONG, - true, - new ClientConfiguration(), - new ClientConfiguration(), - new ClientConfiguration(), - TEST_VALUE_LONG, - TEST_VALUE_LONG, - 1, - skipCheckpointValidationValue, - "abcd", - TEST_VALUE_LONG); + TEST_STRING, + TEST_STRING, + TEST_STRING, + null, + null, + null, + null, + TEST_VALUE_LONG, + TEST_STRING, + 3, + TEST_VALUE_LONG, + false, + TEST_VALUE_LONG, + TEST_VALUE_LONG, + true, + new ClientConfiguration(), + new ClientConfiguration(), + new ClientConfiguration(), + TEST_VALUE_LONG, + TEST_VALUE_LONG, + 1, + skipCheckpointValidationValue, + "abcd", + TEST_VALUE_LONG, + TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { System.out.println(e.getMessage()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 7e8937a6..d74a0b16 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -39,10 +39,14 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; /** * Unit tests for KinesisDataFetcher. */ +@RunWith(MockitoJUnitRunner.class) public class KinesisDataFetcherTest { private static final int MAX_RECORDS = 1; @@ -55,6 +59,9 @@ public class KinesisDataFetcherTest { InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private static final InitialPositionInStreamExtended INITIAL_POSITION_AT_TIMESTAMP = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date(1000)); + + @Mock + private KinesisClientLibConfiguration configuration; /** * @throws java.lang.Exception @@ -115,8 +122,9 @@ public class KinesisDataFetcherTest { public void testadvanceIteratorTo() throws KinesisClientLibException { IKinesisProxy kinesis = mock(IKinesisProxy.class); ICheckpoint checkpoint = mock(ICheckpoint.class); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); String iteratorA = "foo"; @@ -148,8 +156,9 @@ public class KinesisDataFetcherTest { @Test public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { IKinesisProxy kinesis = mock(IKinesisProxy.class); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); String iteratorHorizon = "horizon"; when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon); @@ -178,9 +187,10 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString()); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); // Create data fectcher and initialize it with latest type checkpoint - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher); // Call getRecords of dataFetcher which will throw an exception @@ -197,8 +207,9 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); @@ -223,8 +234,9 @@ public class KinesisDataFetcherTest { ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); + when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); fetcher.initialize(seqNo, initialPositionInStream); List actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java new file mode 100644 index 00000000..bda02538 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.model.Record; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.model.GetRecordsResult; + +/** + * + */ +@RunWith(MockitoJUnitRunner.class) +public class PrefetchGetRecordsCacheIntegrationTest { + private static final int MAX_SIZE = 3; + private static final int MAX_BYTE_SIZE = 5 * 1024 * 1024; + private static final int MAX_RECORDS_COUNT = 30_000; + private static final int MAX_RECORDS_PER_CALL = 10_000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; + + private PrefetchGetRecordsCache getRecordsCache; + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private KinesisDataFetcher dataFetcher; + private ExecutorService executorService; + private List records; + + @Mock + private IKinesisProxy proxy; + + @Mock + private ShardInfo shardInfo; + + @Mock + private KinesisClientLibConfiguration configuration; + + @Before + public void setup() { + when(configuration.getIdleMillisBetweenCalls()).thenReturn(IDLE_MILLIS_BETWEEN_CALLS); + + records = new ArrayList<>(); + dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo, configuration); + getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + executorService = spy(Executors.newFixedThreadPool(1)); + + getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE, + MAX_BYTE_SIZE, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy, + executorService); + } + + @Test + public void testRollingCache() { + getRecordsCache.start(); + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult(); + + assertTrue(processRecordsInput1.getRecords().isEmpty()); + assertEquals(processRecordsInput1.getMillisBehindLatest(), new Long(1000)); + assertNotNull(processRecordsInput1.getCacheEntryTime()); + + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + + assertNotEquals(processRecordsInput1, processRecordsInput2); + } + + @Test + public void testFullCache() { + getRecordsCache.start(); + sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS); + + assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE); + + ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult(); + ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult(); + + assertNotEquals(processRecordsInput1, processRecordsInput2); + } + + @Test + public void testDifferentShardCaches() { + ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1)); + KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo, configuration)); + GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, "Test-shard")); + GetRecordsCache getRecordsCache2 = new PrefetchGetRecordsCache( + MAX_SIZE, + MAX_BYTE_SIZE, + MAX_RECORDS_COUNT, + MAX_RECORDS_PER_CALL, + getRecordsRetrievalStrategy2, + executorService2 + ); + + getRecordsCache.start(); + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + Record record = mock(Record.class); + ByteBuffer byteBuffer = ByteBuffer.allocate(512 * 1024); + when(record.getData()).thenReturn(byteBuffer); + + records.add(record); + records.add(record); + records.add(record); + records.add(record); + getRecordsCache2.start(); + + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + ProcessRecordsInput p1 = getRecordsCache.getNextResult(); + + ProcessRecordsInput p2 = getRecordsCache2.getNextResult(); + + assertNotEquals(p1, p2); + assertTrue(p1.getRecords().isEmpty()); + assertFalse(p2.getRecords().isEmpty()); + assertEquals(p2.getRecords().size(), records.size()); + + getRecordsCache2.shutdown(); + verify(executorService2).shutdownNow(); + verify(getRecordsRetrievalStrategy2).shutdown(); + } + + @After + public void shutdown() { + getRecordsCache.shutdown(); + verify(executorService).shutdownNow(); + verify(getRecordsRetrievalStrategy).shutdown(); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + } + + private class KinesisDataFetcherForTest extends KinesisDataFetcher { + public KinesisDataFetcherForTest(final IKinesisProxy kinesisProxy, + final ShardInfo shardInfo, + final KinesisClientLibConfiguration configuration) { + super(kinesisProxy, shardInfo, configuration); + } + + @Override + public GetRecordsResult getRecords(final int maxRecords) { + GetRecordsResult getRecordsResult = new GetRecordsResult(); + getRecordsResult.setRecords(new ArrayList<>(records)); + getRecordsResult.setMillisBehindLatest(1000L); + + sleepBeforeNextCall(); + + return getRecordsResult; + } + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index eeb8ff1d..8517138f 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -182,6 +182,12 @@ public class PrefetchGetRecordsCacheTest { verify(executorService, times(0)).execute(any()); getRecordsCache.getNextResult(); } + + @Test(expected = IllegalStateException.class) + public void testCallAfterShutdown() { + when(executorService.isShutdown()).thenReturn(true); + getRecordsCache.getNextResult(); + } @After public void shutdown() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index e55336d9..94d0918e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -47,7 +48,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.google.protobuf.ByteString; @@ -75,11 +75,7 @@ public class ProcessTaskTest { @Mock private ThrottlingReporter throttlingReporter; @Mock - private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; - @Mock - private RecordsFetcherFactory mockRecordsFetcherFactory; - @Mock - private GetRecordsCache mockRecordsFetcher; + private GetRecordsCache getRecordsCache; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -96,34 +92,40 @@ public class ProcessTaskTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); - when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); + shardInfo, + config, + mockRecordProcessor, + mockCheckpointer, + mockDataFetcher, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + throttlingReporter, + getRecordsCache); } @Test public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher) + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache) .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockRecordsFetcher).getNextResult(); + verify(getRecordsCache).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @Test public void testProcessTaskWithNonExistentStream() { - // Data fetcher returns a null Result when the stream does not exist - doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult(); + // Data fetcher returns a null Result ` the stream does not exist + doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult(); TaskResult result = processTask.call(); - verify(mockRecordsFetcher).getNextResult(); + verify(getRecordsCache).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -307,14 +309,13 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockRecordsFetcher.getNextResult()).thenReturn( - new GetRecordsResult().withRecords(records)); + when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockRecordsFetcher).getNextResult(); + verify(getRecordsCache).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 89c40121..33d613de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -20,9 +20,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; @@ -48,12 +48,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -97,14 +98,16 @@ public class ShardConsumerTest { // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); - + private final int maxRecords = 500; + private RecordsFetcherFactory recordsFetcherFactory; + + private GetRecordsCache getRecordsCache; + @Mock private IRecordProcessor processor; @Mock private KinesisClientLibConfiguration config; @Mock - private RecordsFetcherFactory recordsFetcherFactory; - @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -112,6 +115,16 @@ public class ShardConsumerTest { private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; + + @Before + public void setup() { + getRecordsCache = null; + + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(config.getIdleMillisBetweenCalls()).thenReturn(0l); + } + /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -136,15 +149,15 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); - + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -160,7 +173,6 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); } - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -185,14 +197,14 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -214,7 +226,6 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { - when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -228,14 +239,14 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; @@ -308,7 +319,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -319,19 +329,39 @@ public class ShardConsumerTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, - config, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -340,6 +370,7 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -352,6 +383,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(5)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.notifyShutdownRequested(shutdownNotification); @@ -375,6 +408,8 @@ public class ShardConsumerTest { verify(shutdownNotification, atLeastOnce()).shutdownComplete(); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); + + verify(getRecordsCache).shutdown(); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); @@ -411,7 +446,6 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -423,19 +457,39 @@ public class ShardConsumerTest { atTimestamp); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + ShardConsumer consumer = new ShardConsumer(shardInfo, streamConfig, checkpoint, processor, - config, + recordProcessorCheckpointer, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -444,6 +498,8 @@ public class ShardConsumerTest { assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); + + verify(getRecordsCache).start(); // We expect to process all records in numRecs calls for (int i = 0; i < numRecs;) { @@ -456,6 +512,8 @@ public class ShardConsumerTest { } Thread.sleep(50L); } + + verify(getRecordsCache, times(4)).getNextResult(); assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); @@ -468,8 +526,11 @@ public class ShardConsumerTest { executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); + verify(getRecordsCache).shutdown(); + String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); assertEquals(4, processor.getProcessedRecords().size()); file.delete(); @@ -491,14 +552,16 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, - config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + config); + + GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache()); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); @@ -548,9 +611,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.empty(), - Optional.empty()); + Optional.empty(), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + SynchronousGetRecordsRetrievalStrategy.class); } @Test @@ -576,9 +641,11 @@ public class ShardConsumerTest { taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, Optional.of(1), - Optional.of(2)); + Optional.of(2), + config); - assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); + assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(), + AsynchronousGetRecordsRetrievalStrategy.class); } //@formatter:off (gets the formatting wrong) diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 5d91c698..17a53137 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -59,7 +59,7 @@ public class ShutdownTaskTest { IRecordProcessor defaultRecordProcessor = new TestStreamlet(); @Mock - private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private GetRecordsCache getRecordsCache; /** * @throws java.lang.Exception @@ -80,7 +80,7 @@ public class ShutdownTaskTest { */ @Before public void setUp() throws Exception { - doNothing().when(getRecordsRetrievalStrategy).shutdown(); + doNothing().when(getRecordsCache).shutdown(); } /** @@ -109,7 +109,7 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -135,11 +135,11 @@ public class ShutdownTaskTest { cleanupLeasesOfCompletedShards, leaseManager, TASK_BACKOFF_TIME_MILLIS, - getRecordsRetrievalStrategy); + getRecordsCache); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); - verify(getRecordsRetrievalStrategy).shutdown(); + verify(getRecordsCache).shutdown(); } /** @@ -147,7 +147,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 9f5bcbee..bf30c510 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -60,6 +60,7 @@ import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Matchers; @@ -129,6 +130,8 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; + + private RecordsFetcherFactory recordsFetcherFactory; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @@ -156,6 +159,13 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; + + @Before + public void setup() { + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(config.getIdleMillisBetweenCalls()).thenReturn(500L); + } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = @@ -283,10 +293,22 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, - parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, - leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); + Worker worker = new Worker(stageName, + streamletFactory, + config, + streamConfig, + INITIAL_POSITION_LATEST, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + nullMetricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + shardPrioritization); Worker workerSpy = spy(worker); @@ -759,10 +781,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -909,10 +943,22 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { + Worker worker = new InjectableWorker("testRequestShutdown", + recordProcessorFactory, + config, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization) { @Override void postConstruct() { this.gracefulShutdownCoordinator = coordinator; @@ -973,10 +1019,22 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1045,10 +1103,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1148,10 +1218,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1255,10 +1337,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1329,10 +1423,22 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, - INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, - cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); + Worker worker = new Worker("testRequestShutdown", + recordProcessorFactory, + clientConfig, + streamConfig, + INITIAL_POSITION_TRIM_HORIZON, + parentShardPollIntervalMillis, + shardSyncIntervalMillis, + cleanupLeasesUponShardCompletion, + leaseCoordinator, + leaseCoordinator, + executorService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + false, + shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1374,10 +1480,22 @@ public class WorkerTest { KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, - parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, - checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); + super(applicationName, + recordProcessorFactory, + config, + streamConfig, + initialPositionInStream, + parentShardPollIntervalMillis, + shardSyncIdleTimeMillis, + cleanupLeasesUponShardCompletion, + checkpoint, + leaseCoordinator, + execService, + metricsFactory, + taskBackoffTimeMillis, + failoverTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + shardPrioritization); postConstruct(); } @@ -1681,8 +1799,10 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); + KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null)); + + when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L); + Worker worker = new Worker(stageName, recordProcessorFactory,