added fatory methods

This commit is contained in:
Wei 2017-09-20 15:34:36 -07:00
parent 024d86da76
commit 248605ed91
3 changed files with 166 additions and 12 deletions

View file

@ -0,0 +1,37 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.
*/
public interface RecordsFetcherFactory {
/**
* Returns a records fetcher processor to be used for processing data records for a (assigned) shard.
*
* @return Returns a record fetcher object
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy);
void setMaxSize(int maxSize);
void setMaxByteSize(int maxByteSize);
void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy);
}

View file

@ -0,0 +1,62 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Setter;
import lombok.extern.apachecommons.CommonsLog;
import java.util.concurrent.Executors;
@CommonsLog
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private final int maxRecords;
private int maxSize = 10;
private int maxByteSize = 15 * 1024 * 1024;
private int maxRecordsCount = 30000;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
}
public SimpleRecordsFetcherFactory(int maxRecords, int maxSize, int maxByteSize, int maxRecordsCount) {
this.maxRecords = maxRecords;
this.maxSize = maxSize;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
}
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, dataFetchingStrategy,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
}
}
public void setMaxSize(int maxSize){
this.maxSize = maxSize;
}
public void setMaxByteSize(int maxByteSize){
this.maxByteSize = maxByteSize;
}
public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}
}

View file

@ -57,8 +57,6 @@ public class ConsumerStatesTest {
@Mock
private IRecordProcessor recordProcessor;
@Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock
private ExecutorService executorService;
@ -78,10 +76,6 @@ public class ConsumerStatesTest {
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
@Mock
private SynchronousGetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private GetRecordsCache recordsFetcher;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
@ -92,7 +86,6 @@ public class ConsumerStatesTest {
public void setup() {
when(consumer.getStreamConfig()).thenReturn(streamConfig);
when(consumer.getRecordProcessor()).thenReturn(recordProcessor);
when(consumer.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(consumer.getRecordProcessorCheckpointer()).thenReturn(recordProcessorCheckpointer);
when(consumer.getExecutorService()).thenReturn(executorService);
when(consumer.getShardInfo()).thenReturn(shardInfo);
@ -160,6 +153,68 @@ public class ConsumerStatesTest {
assertThat(state.getTaskType(), equalTo(TaskType.INITIALIZE));
}
@Test
public void processingStateTestSynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty());
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty());
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void processingStateTestAsynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test
public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();