diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java new file mode 100644 index 00000000..86e78ac4 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard. + * Clients may choose to create separate instantiations, or re-use instantiations. + */ + +public interface RecordsFetcherFactory { + + /** + * Returns a records fetcher processor to be used for processing data records for a (assigned) shard. + * + * @return Returns a record fetcher object + */ + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy); + + void setMaxSize(int maxSize); + + void setMaxByteSize(int maxByteSize); + + void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java new file mode 100644 index 00000000..465110c1 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import lombok.Setter; +import lombok.extern.apachecommons.CommonsLog; + +import java.util.concurrent.Executors; + +@CommonsLog +public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { + private final int maxRecords; + private int maxSize = 10; + private int maxByteSize = 15 * 1024 * 1024; + private int maxRecordsCount = 30000; + private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; + + public SimpleRecordsFetcherFactory(int maxRecords) { + this.maxRecords = maxRecords; + } + + public SimpleRecordsFetcherFactory(int maxRecords, int maxSize, int maxByteSize, int maxRecordsCount) { + this.maxRecords = maxRecords; + this.maxSize = maxSize; + this.maxByteSize = maxByteSize; + this.maxRecordsCount = maxRecordsCount; + } + + @Override + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + } else { + return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, dataFetchingStrategy, + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + } + } + + public void setMaxSize(int maxSize){ + this.maxSize = maxSize; + } + + public void setMaxByteSize(int maxByteSize){ + this.maxByteSize = maxByteSize; + } + + public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ + this.dataFetchingStrategy = dataFetchingStrategy; + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 89a582a4..aa38d447 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -57,8 +57,6 @@ public class ConsumerStatesTest { @Mock private IRecordProcessor recordProcessor; @Mock - private RecordsFetcherFactory recordsFetcherFactory; - @Mock private RecordProcessorCheckpointer recordProcessorCheckpointer; @Mock private ExecutorService executorService; @@ -78,10 +76,6 @@ public class ConsumerStatesTest { private IKinesisProxy kinesisProxy; @Mock private InitialPositionInStreamExtended initialPositionInStream; - @Mock - private SynchronousGetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - @Mock - private GetRecordsCache recordsFetcher; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -92,7 +86,6 @@ public class ConsumerStatesTest { public void setup() { when(consumer.getStreamConfig()).thenReturn(streamConfig); when(consumer.getRecordProcessor()).thenReturn(recordProcessor); - when(consumer.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer); when(consumer.getExecutorService()).thenReturn(executorService); when(consumer.getShardInfo()).thenReturn(shardInfo); @@ -160,6 +153,68 @@ public class ConsumerStatesTest { assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE)); } + @Test + public void processingStateTestSynchronous() { + when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty()); + when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty()); + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) )); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + + } + + @Test + public void processingStateTestAsynchronous() { + when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1)); + when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2)); + + ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); + ITask task = state.createTask(consumer); + + assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo))); + assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor))); + assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer", + equalTo(recordProcessorCheckpointer))); + assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); + assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); + assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); + assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) )); + + assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); + + assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); + assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), + equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState())); + + assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING)); + assertThat(state.getTaskType(), equalTo(TaskType.PROCESS)); + + } + @Test public void shutdownRequestState() { ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); @@ -266,7 +321,7 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); } @@ -276,17 +331,17 @@ public class ConsumerStatesTest { } static ReflectionPropertyMatcher procTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(ProcessTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher initTask(Class valueTypeClass, - String propertyName, Matcher matcher) { + String propertyName, Matcher matcher) { return taskWith(InitializeTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher taskWith(Class taskTypeClass, - Class valueTypeClass, String propertyName, Matcher matcher) { + Class valueTypeClass, String propertyName, Matcher matcher) { return new ReflectionPropertyMatcher<>(taskTypeClass, valueTypeClass, matcher, propertyName); } @@ -299,7 +354,7 @@ public class ConsumerStatesTest { private final Field matchingField; private ReflectionPropertyMatcher(Class taskTypeClass, Class valueTypeClass, - Matcher matcher, String propertyName) { + Matcher matcher, String propertyName) { this.taskTypeClass = taskTypeClass; this.valueTypeClazz = valueTypeClass; this.matcher = matcher;