From 9249f280929880a1eeab076ad666348c10898355 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 22 Sep 2017 13:32:49 -0700 Subject: [PATCH 1/2] Merging master into prefetch (#223) * MultiLangDaemon: Make shutdown grace configurable (#204) Allow configuring the amount of time that the graceful shutdown process will wait for the client to complete its shutdown. * Release 1.8.2 of the Amazon Kinesis Client for Java (#218) * Add support for two phase checkpoints Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint. Should the checkpoint fail the attempted sequence number is provided in the InitializationInput#getPendingCheckpointSequenceNumber otherwise the value will be null. * PR #188 * Support timeouts, and retry for GetRecords calls. Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests. * PR #214 * Notification when the lease table is throttled When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays. * PR #212 * Support configuring the graceful shutdown timeout for MultiLang Clients This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding shutdownGraceMillis to the properties file set to the number of milliseconds to wait. * PR #204 * Calling shutdown on the RetrievalStrategy (#222) Fixes a bug where the retriever wasn't being shutdown when a record processor was being shutdown. * Release 1.8.3 of the Amazon Kinesis Client for Java (#224) * Call shutdown on the retriever when the record processor is being shutdown This fixes a bug that could leak threads if using the AsynchronousGetRecordsRetrievalStrategy is being used. The asynchronous retriever is only used when KinesisClientLibConfiguration#retryGetRecordsInSeconds, and KinesisClientLibConfiguration#maxGetRecordsThreadPool are set. * PR #222 --- META-INF/MANIFEST.MF | 2 +- README.md | 10 ++- pom.xml | 2 +- .../lib/worker/ConsumerStates.java | 5 +- .../worker/KinesisClientLibConfiguration.java | 2 +- .../clientlibrary/lib/worker/ProcessTask.java | 42 +--------- .../lib/worker/ShardConsumer.java | 21 +++-- .../lib/worker/ShutdownTask.java | 41 +++++----- .../lib/worker/ConsumerStatesTest.java | 14 +--- .../lib/worker/ShardConsumerTest.java | 76 ++++++++++++++++--- .../lib/worker/ShutdownTaskTest.java | 37 ++++++--- 11 files changed, 151 insertions(+), 101 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 9665aebd..3a8282e4 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.2 +Bundle-Version: 1.8.3 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/README.md b/README.md index fa566946..ddaa6194 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.3 (September 22, 2017) +* Call shutdown on the retriever when the record processor is being shutdown + This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used. + The asynchronous retriever is only used when [`KinesisClientLibConfiguration#retryGetRecordsInSeconds`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L227), and [`KinesisClientLibConfiguration#maxGetRecordsThreadPool`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L230) are set. + * [PR #222](https://github.com/awslabs/amazon-kinesis-client/pull/222) + ### Release 1.8.2 (September 20, 2017) * Add support for two phase checkpoints Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint. @@ -36,10 +42,10 @@ To make it easier for developers to write record processors in other languages, * [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188) * Support timeouts, and retry for GetRecords calls. Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests. - * [PR #214](https://github.com/awslabs/amazon-kinesis-client/pulls/214) + * [PR #214](https://github.com/awslabs/amazon-kinesis-client/pull/214) * Notification when the lease table is throttled When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays. - * [PR #212](https://github.com/awslabs/amazon-kinesis-client/pulls/212) + * [PR #212](https://github.com/awslabs/amazon-kinesis-client/pull/212) * Support configuring the graceful shutdown timeout for MultiLang Clients This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait. * [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204) diff --git a/pom.xml b/pom.xml index e34edaec..61c8c6cf 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.2 + 1.8.3 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index f6d96b4d..d3ccb911 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -312,7 +312,7 @@ class ConsumerStates { return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), - consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool()); + consumer.getGetRecordsRetrievalStrategy()); } @Override @@ -516,7 +516,8 @@ class ConsumerStates { consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getInitialPositionInStream(), consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), - consumer.getTaskBackoffTimeMillis()); + consumer.getTaskBackoffTimeMillis(), + consumer.getGetRecordsRetrievalStrategy()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 1bfd0fc0..c970daa0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.2"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 02fc4d70..90ac2c09 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -65,17 +65,6 @@ class ProcessTask implements ITask { private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - ShardInfo shardInfo) { - Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> - maxGetRecordsThreadPool.map(max -> - new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); - - return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); - } - /** * @param shardInfo * contains information about the shard @@ -89,40 +78,17 @@ class ProcessTask implements ITask { * Kinesis data fetcher (used to fetch records from Kinesis) * @param backoffTimeMillis * backoff time when catching exceptions - */ - public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); - } - - /** - * @param shardInfo - * contains information about the shard - * @param streamConfig - * Stream configuration - * @param recordProcessor - * Record processor used to process the data records for the shard - * @param recordProcessorCheckpointer - * Passed to the RecordProcessor so it can checkpoint progress - * @param dataFetcher - * Kinesis data fetcher (used to fetch records from Kinesis) - * @param backoffTimeMillis - * backoff time when catching exceptions - * @param retryGetRecordsInSeconds - * time in seconds to wait before the worker retries to get a record. - * @param maxGetRecordsThreadPool - * max number of threads in the getRecords thread pool. + * @param getRecordsRetrievalStrategy + * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { + GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), - makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo)); + getRecordsRetrievalStrategy); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 69057b38..4bbe1939 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -55,14 +55,24 @@ class ShardConsumer { private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; - @Getter - private final Optional retryGetRecordsInSeconds; - @Getter - private final Optional maxGetRecordsThreadPool; private ITask currentTask; private long currentTaskSubmitTime; private Future future; + + @Getter + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + ShardInfo shardInfo) { + Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> + maxGetRecordsThreadPool.map(max -> + new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); + + return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); + } /* * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do @@ -149,8 +159,7 @@ class ShardConsumer { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; - this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; - this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index d40fbb0e..f56033a8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -46,20 +46,22 @@ class ShutdownTask implements ITask { private final boolean cleanupLeasesOfCompletedShards; private final TaskType taskType = TaskType.SHUTDOWN; private final long backoffTimeMillis; + private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; /** * Constructor. */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES ShutdownTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - ShutdownReason reason, - IKinesisProxy kinesisProxy, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, - ILeaseManager leaseManager, - long backoffTimeMillis) { + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ShutdownReason reason, + IKinesisProxy kinesisProxy, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + ILeaseManager leaseManager, + long backoffTimeMillis, + GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -69,6 +71,7 @@ class ShutdownTask implements ITask { this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.leaseManager = leaseManager; this.backoffTimeMillis = backoffTimeMillis; + this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; } /* @@ -79,7 +82,7 @@ class ShutdownTask implements ITask { */ @Override public TaskResult call() { - Exception exception = null; + Exception exception; boolean applicationException = false; try { @@ -107,6 +110,8 @@ class ShutdownTask implements ITask { + shardInfo.getShardId()); } } + LOG.debug("Shutting down retrieval strategy."); + getRecordsRetrievalStrategy.shutdown(); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); } catch (Exception e) { applicationException = true; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 307aa6b8..63f20a72 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -17,7 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.never; @@ -26,7 +25,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.lang.reflect.Field; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -76,6 +74,8 @@ public class ConsumerStatesTest { private IKinesisProxy kinesisProxy; @Mock private InitialPositionInStreamExtended initialPositionInStream; + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -98,7 +98,7 @@ public class ConsumerStatesTest { when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getShutdownReason()).thenReturn(reason); - + when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); } private static final Class> LEASE_MANAGER_CLASS = (Class>) (Class) ILeaseManager.class; @@ -155,9 +155,6 @@ public class ConsumerStatesTest { @Test public void processingStateTestSynchronous() { - when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty()); - when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty()); - ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ITask task = state.createTask(consumer); @@ -168,7 +165,6 @@ public class ConsumerStatesTest { assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); - assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) )); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); @@ -186,9 +182,6 @@ public class ConsumerStatesTest { @Test public void processingStateTestAsynchronous() { - when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1)); - when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2)); - ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ITask task = state.createTask(consumer); @@ -199,7 +192,6 @@ public class ConsumerStatesTest { assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); - assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) )); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 8073d0df..a3f786a6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -39,6 +40,7 @@ import java.util.Date; import java.util.List; import java.util.ListIterator; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -509,6 +511,62 @@ public class ShardConsumerTest { Thread.sleep(50L); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); } + + @Test + public void testCreateSynchronousGetRecordsRetrieval() { + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + StreamConfig streamConfig = + new StreamConfig(streamProxy, + 1, + 10, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardConsumer shardConsumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + null, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + Optional.empty(), + Optional.empty()); + + assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); + } + + @Test + public void testCreateAsynchronousGetRecordsRetrieval() { + ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); + StreamConfig streamConfig = + new StreamConfig(streamProxy, + 1, + 10, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardConsumer shardConsumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + null, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + Optional.of(1), + Optional.of(2)); + + assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); + } //@formatter:off (gets the formatting wrong) private void verifyConsumedRecords(List expectedRecords, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 9eaf7e8e..5d91c698 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -1,20 +1,22 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.HashSet; @@ -34,10 +36,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; /** * */ +@RunWith(MockitoJUnitRunner.class) public class ShutdownTaskTest { private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = @@ -51,6 +57,9 @@ public class ShutdownTaskTest { defaultParentShardIds, ExtendedSequenceNumber.LATEST); IRecordProcessor defaultRecordProcessor = new TestStreamlet(); + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; /** * @throws java.lang.Exception @@ -71,6 +80,7 @@ public class ShutdownTaskTest { */ @Before public void setUp() throws Exception { + doNothing().when(getRecordsRetrievalStrategy).shutdown(); } /** @@ -98,7 +108,8 @@ public class ShutdownTaskTest { INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, leaseManager, - TASK_BACKOFF_TIME_MILLIS); + TASK_BACKOFF_TIME_MILLIS, + getRecordsRetrievalStrategy); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -123,10 +134,12 @@ public class ShutdownTaskTest { INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, leaseManager, - TASK_BACKOFF_TIME_MILLIS); + TASK_BACKOFF_TIME_MILLIS, + getRecordsRetrievalStrategy); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); + verify(getRecordsRetrievalStrategy).shutdown(); } /** @@ -134,7 +147,7 @@ public class ShutdownTaskTest { */ @Test public final void testGetTaskType() { - ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0); + ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } From 49b761c5e2439963363f03eae1eb9565c296f423 Mon Sep 17 00:00:00 2001 From: BtXin Date: Fri, 22 Sep 2017 14:20:08 -0700 Subject: [PATCH 2/2] Merging changes (#225) * integrated prefetch with shardconsumer * fixed tests * added fatory methods * added tests and fixed broken tests * Resolved conflicts * Addressed comments * Integrated the changes --- .../lib/worker/ConsumerStates.java | 2 - .../worker/KinesisClientLibConfiguration.java | 142 ++++++++++++++++++ .../clientlibrary/lib/worker/ProcessTask.java | 22 +-- .../lib/worker/RecordsFetcherFactory.java | 39 +++++ .../lib/worker/ShardConsumer.java | 14 +- .../worker/SimpleRecordsFetcherFactory.java | 63 ++++++++ .../clientlibrary/lib/worker/Worker.java | 31 ++-- .../lib/worker/ConsumerStatesTest.java | 39 ++++- .../lib/worker/ProcessTaskTest.java | 23 +-- .../lib/worker/RecordsFetcherFactoryTest.java | 41 +++++ .../lib/worker/ShardConsumerTest.java | 19 ++- .../clientlibrary/lib/worker/WorkerTest.java | 62 ++++++-- 12 files changed, 436 insertions(+), 61 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index d3ccb911..84e234b9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -14,8 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.Optional; - /** * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index c970daa0..ebc4b559 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration { @Getter private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; + @Getter + private RecordsFetcherFactory recordsFetcherFactory; + /** * Constructor. * @@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + RecordsFetcherFactory recordsFetcherFactory) { + // Check following values are greater than zero + checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); + checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); + checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); + checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); + checkIsValuePositive("MaxRecords", (long) maxRecords); + checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); + checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); + checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); + checkIsRegionNameValid(regionName); + this.applicationName = applicationName; + this.tableName = applicationName; + this.streamName = streamName; + this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; + this.initialPositionInStream = initialPositionInStream; + this.kinesisCredentialsProvider = kinesisCredentialsProvider; + this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; + this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; + this.failoverTimeMillis = failoverTimeMillis; + this.maxRecords = maxRecords; + this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; + this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; + this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; + this.shardSyncIntervalMillis = shardSyncIntervalMillis; + this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; + this.workerIdentifier = workerId; + this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); + this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); + this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); + this.taskBackoffTimeMillis = taskBackoffTimeMillis; + this.metricsBufferTimeMillis = metricsBufferTimeMillis; + this.metricsMaxQueueSize = metricsMaxQueueSize; + this.metricsLevel = DEFAULT_METRICS_LEVEL; + this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; + this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; + this.regionName = regionName; + this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; + this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; + this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; + this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; + this.initialPositionInStreamExtended = + InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; + this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; + this.recordsFetcherFactory = recordsFetcherFactory; + this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis; } @@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration { return this; } + /** + * + * @param maxCacheSize the max number of records stored in the getRecordsCache + * @return this configuration object + */ + public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { + checkIsValuePositive("maxCacheSize", maxCacheSize); + recordsFetcherFactory.setMaxSize(maxCacheSize); + return this; + } + + public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { + checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); + recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + return this; + } + + public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); + return this; + } + + public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { + checkIsValuePositive("maxRecordsCount", maxRecordsCount); + recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + return this; + } + /** * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index 90ac2c09..9dac442c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -108,9 +108,9 @@ class ProcessTask implements ITask { * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -161,7 +161,7 @@ class ProcessTask implements ITask { final GetRecordsResult getRecordsResult = getRecordsResult(); throttlingReporter.success(); List records = getRecordsResult.getRecords(); - + if (!records.isEmpty()) { scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); } else { @@ -205,7 +205,7 @@ class ProcessTask implements ITask { /** * Dispatches a batch of records to the record processor, and handles any fallout from that. - * + * * @param getRecordsResult * the result of the last call to Kinesis * @param records @@ -233,7 +233,7 @@ class ProcessTask implements ITask { /** * Whether we should call process records or not - * + * * @param records * the records returned from the call to Kinesis, and/or deaggregation * @return true if the set of records should be dispatched to the record process, false if they should not. @@ -244,7 +244,7 @@ class ProcessTask implements ITask { /** * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation - * + * * @param records * the records to deaggregate is deaggregation is required. * @return returns either the deaggregated records, or the original records @@ -267,7 +267,7 @@ class ProcessTask implements ITask { /** * Emits metrics, and sleeps if there are no records available - * + * * @param startTimeMillis * the time when the task started */ @@ -304,8 +304,8 @@ class ProcessTask implements ITask { * @return the largest extended sequence number among the retained records */ private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List records, - final ExtendedSequenceNumber lastCheckpointValue, - final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { + final ExtendedSequenceNumber lastCheckpointValue, + final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ListIterator recordIterator = records.listIterator(); while (recordIterator.hasNext()) { @@ -393,4 +393,4 @@ class ProcessTask implements ITask { return getRecordsResult; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..cdd80e49 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. + * Clients may choose to create separate instantiations, or re-use instantiations. + */ + +public interface RecordsFetcherFactory { + + /** + * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * + * @return Returns a record fetcher object + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + + void setMaxSize(int maxSize); + + void setMaxByteSize(int maxByteSize); + + void setMaxRecordsCount(int maxRecordsCount); + + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 4bbe1939..f4be0fc5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -43,6 +43,7 @@ class ShardConsumer { private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; + private final KinesisClientLibConfiguration config; private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final ExecutorService executorService; private final ShardInfo shardInfo; @@ -59,10 +60,10 @@ class ShardConsumer { private ITask currentTask; private long currentTaskSubmitTime; private Future future; - @Getter private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, @@ -91,6 +92,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -102,6 +104,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -109,9 +112,9 @@ class ShardConsumer { IMetricsFactory metricsFactory, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { - this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, - skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); + this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); } /** @@ -119,6 +122,7 @@ class ShardConsumer { * @param streamConfig Stream configuration to use * @param checkpoint Checkpoint tracker * @param recordProcessor Record processor used to process the data records for the shard + * @param config Kinesis library configuration * @param leaseManager Used to create leases for new shards * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param executorService ExecutorService used to execute process tasks for this shard @@ -132,6 +136,7 @@ class ShardConsumer { StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, + KinesisClientLibConfiguration config, ILeaseManager leaseManager, long parentShardPollIntervalMillis, boolean cleanupLeasesOfCompletedShards, @@ -143,6 +148,7 @@ class ShardConsumer { Optional maxGetRecordsThreadPool) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; + this.config = config; this.executorService = executorService; this.shardInfo = shardInfo; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..2ad61f16 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Setter; +import lombok.extern.apachecommons.CommonsLog; + +import java.util.concurrent.Executors; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxSize = 10; + private int maxByteSize = 15 * 1024 * 1024; + private int maxRecordsCount = 30000; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + } else { + return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + } + } + + @Override + public void setMaxSize(int maxSize){ + this.maxSize = maxSize; + } + + @Override + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + @Override + public void setMaxRecordsCount(int maxRecordsCount) { + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..494d1c50 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -73,6 +73,7 @@ public class Worker implements Runnable { private final String applicationName; private final IRecordProcessorFactory recordProcessorFactory; + private final KinesisClientLibConfiguration config; private final StreamConfig streamConfig; private final InitialPositionInStreamExtended initialPosition; private final ICheckpoint checkpointTracker; @@ -245,6 +246,7 @@ public class Worker implements Runnable { KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + config, new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -306,6 +308,8 @@ public class Worker implements Runnable { * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @paran config + * Kinesis Library configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -333,24 +337,25 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, - InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, + StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, + this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, Optional.empty(), Optional.empty()); } - /** * @param applicationName * Name of the Kinesis application * @param recordProcessorFactory * Used to get record processor instances for processing data from shards + * @param config + * Kinesis Library Configuration * @param streamConfig * Stream configuration * @param initialPositionInStream @@ -382,7 +387,7 @@ public class Worker implements Runnable { */ // NOTE: This has package level access solely for testing // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, + Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, @@ -391,6 +396,7 @@ public class Worker implements Runnable { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; + this.config = config; this.streamConfig = streamConfig; this.initialPosition = initialPositionInStream; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; @@ -411,7 +417,6 @@ public class Worker implements Runnable { this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; } - /** * @return the applicationName */ @@ -819,11 +824,11 @@ public class Worker implements Runnable { * * @param shardInfo * Kinesis shard info - * @param factory + * @param processorFactory * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { + ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -832,17 +837,17 @@ public class Worker implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { - consumer = buildConsumer(shardInfo, factory); + consumer = buildConsumer(shardInfo, processorFactory); shardInfoShardConsumerMap.put(shardInfo, consumer); wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - IRecordProcessor recordProcessor = factory.createProcessor(); + protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IRecordProcessor recordProcessor = processorFactory.createProcessor(); - return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, + return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); @@ -1049,6 +1054,7 @@ public class Worker implements Runnable { public static class Builder { private IRecordProcessorFactory recordProcessorFactory; + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; private AmazonKinesis kinesisClient; private AmazonDynamoDB dynamoDBClient; @@ -1244,6 +1250,7 @@ public class Worker implements Runnable { return new Worker(config.getApplicationName(), recordProcessorFactory, + config, new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient).getProxy(config.getStreamName()), config.getMaxRecords(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 63f20a72..77c40cc9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -55,6 +55,8 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock + private KinesisClientLibConfiguration config; + @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -207,6 +209,33 @@ public class ConsumerStatesTest { } + @Test + public void processingStateRecordsFetcher() { + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -313,7 +342,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -323,17 +352,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -346,7 +375,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index b24bf3ec..e55336d9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -18,8 +18,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -78,6 +76,10 @@ public class ProcessTaskTest { private ThrottlingReporter throttlingReporter; @Mock private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; + @Mock + private RecordsFetcherFactory mockRecordsFetcherFactory; + @Mock + private GetRecordsCache mockRecordsFetcher; private List processedRecords; private ExtendedSequenceNumber newLargestPermittedCheckpointValue; @@ -94,8 +96,9 @@ public class ProcessTaskTest { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); + when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); } @@ -103,13 +106,13 @@ public class ProcessTaskTest { public void testProcessTaskWithProvisionedThroughputExceededException() { // Set data fetcher to throw exception doReturn(false).when(mockDataFetcher).isShardEndReached(); - doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) - .getRecords(maxRecords); + doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher) + .getNextResult(); TaskResult result = processTask.call(); verify(throttlingReporter).throttled(); verify(throttlingReporter, never()).success(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertTrue("Result should contain ProvisionedThroughputExceededException", result.getException() instanceof ProvisionedThroughputExceededException); } @@ -117,10 +120,10 @@ public class ProcessTaskTest { @Test public void testProcessTaskWithNonExistentStream() { // Data fetcher returns a null Result when the stream does not exist - doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); + doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult(); TaskResult result = processTask.call(); - verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); + verify(mockRecordsFetcher).getNextResult(); assertNull("Task should not throw an exception", result.getException()); } @@ -304,14 +307,14 @@ public class ProcessTaskTest { private void testWithRecords(List records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber largestPermittedCheckpointValue) { - when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( + when(mockRecordsFetcher.getNextResult()).thenReturn( new GetRecordsResult().withRecords(records)); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); processTask.call(); verify(throttlingReporter).success(); verify(throttlingReporter, never()).throttled(); - verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); + verify(mockRecordsFetcher).getNextResult(); ArgumentCaptor priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); verify(mockRecordProcessor).processRecords(priCaptor.capture()); processedRecords = priCaptor.getValue().getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java new file mode 100644 index 00000000..17a77123 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -0,0 +1,41 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +public class RecordsFetcherFactoryTest { + + private RecordsFetcherFactory recordsFetcherFactory; + + @Mock + private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + } + + @Test + public void createDefaultRecordsFetcherTest() { + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); + } + + @Test + public void createPrefetchRecordsFetcherTest() { + recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); + GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy); + assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index a3f786a6..89c40121 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -47,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -99,6 +101,10 @@ public class ShardConsumerTest { @Mock private IRecordProcessor processor; @Mock + private KinesisClientLibConfiguration config; + @Mock + private RecordsFetcherFactory recordsFetcherFactory; + @Mock private IKinesisProxy streamProxy; @Mock private ILeaseManager leaseManager; @@ -106,7 +112,6 @@ public class ShardConsumerTest { private ICheckpoint checkpoint; @Mock private ShutdownNotification shutdownNotification; - /** * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. */ @@ -131,6 +136,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -179,6 +185,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -207,6 +214,7 @@ public class ShardConsumerTest { @SuppressWarnings("unchecked") @Test public final void testRecordProcessorThrowable() throws Exception { + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); StreamConfig streamConfig = new StreamConfig(streamProxy, @@ -220,6 +228,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -299,7 +308,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -315,6 +324,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -401,7 +411,7 @@ public class ShardConsumerTest { ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); when(leaseManager.getLease(anyString())).thenReturn(null); - + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); TestStreamlet processor = new TestStreamlet(); StreamConfig streamConfig = @@ -418,6 +428,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, leaseManager, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -480,6 +491,7 @@ public class ShardConsumerTest { streamConfig, checkpoint, processor, + config, null, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, @@ -491,6 +503,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..9f5bcbee 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -133,6 +133,8 @@ public class WorkerTest { @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @Mock + private KinesisClientLibConfiguration config; + @Mock private ILeaseManager leaseManager; @Mock private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; @@ -210,6 +212,8 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -228,7 +232,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -257,6 +263,8 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -275,7 +283,7 @@ public class WorkerTest { when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) .thenReturn(secondCheckpoint); - Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); @@ -314,6 +322,8 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -332,7 +342,9 @@ public class WorkerTest { Worker worker = new Worker(stageName, - streamletFactory, streamConfig, INITIAL_POSITION_LATEST, + streamletFactory, + clientConfig, + streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, @@ -371,6 +383,8 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -386,6 +400,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -709,6 +724,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -742,7 +759,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -785,6 +802,8 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -816,7 +835,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -850,6 +869,8 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -888,7 +909,7 @@ public class WorkerTest { when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); - Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { @@ -926,6 +947,8 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -950,7 +973,7 @@ public class WorkerTest { when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -988,6 +1011,8 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1020,7 +1045,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1089,6 +1114,8 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1121,7 +1148,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1195,6 +1222,8 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1226,7 +1255,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1267,6 +1296,8 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1298,7 +1329,7 @@ public class WorkerTest { IRecordProcessor processor = mock(IRecordProcessor.class); when(recordProcessorFactory.createProcessor()).thenReturn(processor); - Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, + Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); @@ -1336,13 +1367,14 @@ public class WorkerTest { private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, - StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, + KinesisClientLibConfiguration config, StreamConfig streamConfig, + InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { - super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, + super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); @@ -1649,10 +1681,12 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + KinesisClientLibConfiguration clientConfig = + new KinesisClientLibConfiguration("app", null, null, null); Worker worker = new Worker(stageName, recordProcessorFactory, + clientConfig, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,