Integrated the changes

This commit is contained in:
Wei 2017-09-21 16:09:26 -07:00
parent 211b0d9daa
commit 30f99fc34c
5 changed files with 46 additions and 75 deletions

View file

@ -308,10 +308,10 @@ class ConsumerStates {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getConfig().getRecordsFetcherFactory(), consumer.getRecordProcessorCheckpointer(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRetryGetRecordsInSeconds(),
consumer.getMaxGetRecordsThreadPool());
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getGetRecordsCache());
}
@Override

View file

@ -19,7 +19,6 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +33,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
@ -55,7 +53,7 @@ class ProcessTask implements ITask {
private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final GetRecordsCache recordsFetcher;
private final GetRecordsCache getRecordsCache;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS;
@ -63,7 +61,6 @@ class ProcessTask implements ITask {
private final long backoffTimeMillis;
private final Shard shard;
private final ThrottlingReporter throttlingReporter;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
@ -83,52 +80,23 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* Record processor factory to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* @param getRecordsCache
* Record processor factory to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param retryGetRecordsInSeconds
* time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* max number of threads in the getRecords thread pool.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool) {
this(shardInfo, streamConfig, recordProcessor, recordsFetcherFactory, recordProcessorCheckpointer, dataFetcher,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
GetRecordsCache getRecordsCache) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), getRecordsCache);
}
/**
@ -138,7 +106,7 @@ class ProcessTask implements ITask {
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordsFetcherFactory
* @param getRecordsCache
* RecordFetcher factory used to create recordFetcher object
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
@ -150,9 +118,9 @@ class ProcessTask implements ITask {
* determines how throttling events should be reported in the log.
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordsFetcherFactory recordsFetcherFactory, RecordProcessorCheckpointer recordProcessorCheckpointer,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
super();
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
@ -162,8 +130,7 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.recordsFetcher = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
this.getRecordsCache = getRecordsCache;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -201,9 +168,9 @@ class ProcessTask implements ITask {
return new TaskResult(null, true);
}
final GetRecordsResult getRecordsResult = getRecordsResult();
final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success();
List<Record> records = getRecordsResult.getRecords();
List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
@ -218,7 +185,7 @@ class ProcessTask implements ITask {
recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) {
callProcessRecords(getRecordsResult, records);
callProcessRecords(processRecordsInput);
}
} catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled();
@ -249,17 +216,14 @@ class ProcessTask implements ITask {
/**
* Dispatches a batch of records to the record processor, and handles any fallout from that.
*
* @param getRecordsResult
* the result of the last call to Kinesis
* @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
* @param processRecordsInput
* the ProcessRecordsInput result of the last call to Kinesis
*/
private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) {
private void callProcessRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
processRecordsInput.withCheckpointer(recordProcessorCheckpointer);
final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try {
@ -382,7 +346,7 @@ class ProcessTask implements ITask {
*
* @return list of data records from Kinesis
*/
private GetRecordsResult getRecordsResult() {
private ProcessRecordsInput getRecordsResult() {
try {
return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) {
@ -418,17 +382,17 @@ class ProcessTask implements ITask {
*
* @return list of data records from Kinesis
*/
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
final GetRecordsResult getRecordsResult = recordsFetcher.getNextResult();
private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (getRecordsResult.getMillisBehindLatest() != null) {
if (processRecordsInput.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
getRecordsResult.getMillisBehindLatest(),
processRecordsInput.getMillisBehindLatest(),
StandardUnit.Milliseconds,
MetricsLevel.SUMMARY);
}
return getRecordsResult;
return processRecordsInput;
}
}

View file

@ -43,7 +43,6 @@ class ShardConsumer {
private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor;
@Getter
private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService;
@ -65,6 +64,19 @@ class ShardConsumer {
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
@Getter
private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/*
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
@ -158,6 +170,9 @@ class ShardConsumer {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, this.retryGetRecordsInSeconds,
this.maxGetRecordsThreadPool, this.shardInfo));
}
/**

View file

@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Setter;
import lombok.extern.apachecommons.CommonsLog;
import java.util.concurrent.Executors;
@ -30,19 +31,12 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
this.maxRecords = maxRecords;
}
public SimpleRecordsFetcherFactory(int maxRecords, int maxSize, int maxByteSize, int maxRecordsCount) {
this.maxRecords = maxRecords;
this.maxSize = maxSize;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
}
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, dataFetchingStrategy,
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
}
}

View file

@ -18,8 +18,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;