Merging changes into the branch and updating the tests.

This commit is contained in:
Sahil Palvia 2017-09-25 11:25:31 -07:00
commit 6321dcafd7
21 changed files with 736 additions and 182 deletions

View file

@ -50,10 +50,13 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
return processRecordsInput; return processRecordsInput;
} }
@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
}
@Override @Override
public void shutdown() { public void shutdown() {
// getRecordsRetrievalStrategy.shutdown();
// Nothing to do here.
//
} }
} }

View file

@ -14,8 +14,6 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Optional;
/** /**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
* and state transitions is contained within the {@link ConsumerState} objects. * and state transitions is contained within the {@link ConsumerState} objects.
@ -253,9 +251,14 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), return new InitializeTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getRecordProcessor(),
consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); consumer.getCheckpoint(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.getStreamConfig(),
consumer.getGetRecordsCache());
} }
@Override @Override
@ -309,10 +312,14 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), return new ProcessTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getStreamConfig(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRecordProcessor(),
consumer.getGetRecordsRetrievalStrategy()); consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getGetRecordsCache());
} }
@Override @Override
@ -371,8 +378,10 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), return new ShutdownNotificationTask(consumer.getRecordProcessor(),
consumer.getShutdownNotification(), consumer.getShardInfo()); consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification(),
consumer.getShardInfo());
} }
@Override @Override
@ -511,13 +520,16 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), return new ShutdownTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(), consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), consumer.isCleanupLeasesOfCompletedShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsRetrievalStrategy()); consumer.getGetRecordsCache());
} }
@Override @Override

View file

@ -34,6 +34,8 @@ public interface GetRecordsCache {
*/ */
ProcessRecordsInput getNextResult(); ProcessRecordsInput getNextResult();
GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy();
/** /**
* This method calls the shutdown behavior on the cache, if available. * This method calls the shutdown behavior on the cache, if available.
*/ */

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -43,6 +43,7 @@ class InitializeTask implements ITask {
// Back off for this interval if we encounter a problem (exception) // Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final GetRecordsCache getRecordsCache;
/** /**
* Constructor. * Constructor.
@ -53,7 +54,8 @@ class InitializeTask implements ITask {
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, long backoffTimeMillis,
StreamConfig streamConfig) { StreamConfig streamConfig,
GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
@ -61,6 +63,7 @@ class InitializeTask implements ITask {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.getRecordsCache = getRecordsCache;
} }
/* /*
@ -80,6 +83,7 @@ class InitializeTask implements ITask {
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
getRecordsCache.start();
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);

View file

@ -232,6 +232,9 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS; private int maxLeaseRenewalThreads = DEFAULT_MAX_LEASE_RENEWAL_THREADS;
@Getter
private RecordsFetcherFactory recordsFetcherFactory;
/** /**
* Constructor. * Constructor.
* *
@ -455,6 +458,117 @@ public class KinesisClientLibConfiguration {
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
RecordsFetcherFactory recordsFetcherFactory) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
checkIsValuePositive("MaxRecords", (long) maxRecords);
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
this.failoverTimeMillis = failoverTimeMillis;
this.maxRecords = maxRecords;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
this.metricsMaxQueueSize = metricsMaxQueueSize;
this.metricsLevel = DEFAULT_METRICS_LEVEL;
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
this.regionName = regionName;
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.shutdownGraceMillis = shutdownGraceMillis;
this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis;
} }
@ -1158,6 +1272,34 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
*
* @param maxCacheSize the max number of records stored in the getRecordsCache
* @return this configuration object
*/
public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) {
checkIsValuePositive("maxCacheSize", maxCacheSize);
recordsFetcherFactory.setMaxSize(maxCacheSize);
return this;
}
public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) {
checkIsValuePositive("maxCacheByteSize", maxCacheByteSize);
recordsFetcherFactory.setMaxByteSize(maxCacheByteSize);
return this;
}
public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy));
return this;
}
public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) {
checkIsValuePositive("maxRecordsCount", maxRecordsCount);
recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount);
return this;
}
/** /**
* @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for * @param timeoutInSeconds The timeout in seconds to wait for the MultiLangProtocol to wait for
*/ */

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.

View file

@ -63,6 +63,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override @Override
public void start() { public void start() {
if (executorService.isShutdown()) {
throw new IllegalStateException("ExecutorService has been shutdown.");
}
if (!started) { if (!started) {
log.info("Starting prefetching thread."); log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon()); executorService.execute(new DefaultGetRecordsCacheDaemon());
@ -72,8 +76,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override @Override
public ProcessRecordsInput getNextResult() { public ProcessRecordsInput getNextResult() {
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}
if (!started) { if (!started) {
throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
} }
ProcessRecordsInput result = null; ProcessRecordsInput result = null;
try { try {
@ -85,9 +93,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
return result; return result;
} }
@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
}
@Override @Override
public void shutdown() { public void shutdown() {
getRecordsRetrievalStrategy.shutdown();
executorService.shutdownNow(); executorService.shutdownNow();
started = false;
} }
private class DefaultGetRecordsCacheDaemon implements Runnable { private class DefaultGetRecordsCacheDaemon implements Runnable {

View file

@ -18,7 +18,6 @@ import java.math.BigInteger;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Optional;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -63,7 +62,7 @@ class ProcessTask implements ITask {
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter; private final ThrottlingReporter throttlingReporter;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsCache getRecordsCache;
/** /**
* @param shardInfo * @param shardInfo
@ -78,17 +77,17 @@ class ProcessTask implements ITask {
* Kinesis data fetcher (used to fetch records from Kinesis) * Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis * @param backoffTimeMillis
* backoff time when catching exceptions * backoff time when catching exceptions
* @param getRecordsRetrievalStrategy * @param getRecordsCache
* The retrieval strategy for fetching records from kinesis * The retrieval strategy for fetching records from kinesis
*/ */
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { GetRecordsCache getRecordsCache) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
getRecordsRetrievalStrategy); getRecordsCache);
} }
/** /**
@ -110,7 +109,7 @@ class ProcessTask implements ITask {
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -120,7 +119,7 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter; this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsCache = getRecordsCache;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -158,9 +157,9 @@ class ProcessTask implements ITask {
return new TaskResult(null, true); return new TaskResult(null, true);
} }
final GetRecordsResult getRecordsResult = getRecordsResult(); final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success(); throttlingReporter.success();
List<Record> records = getRecordsResult.getRecords(); List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
@ -175,7 +174,7 @@ class ProcessTask implements ITask {
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
callProcessRecords(getRecordsResult, records); callProcessRecords(processRecordsInput, records);
} }
} catch (ProvisionedThroughputExceededException pte) { } catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled(); throttlingReporter.throttled();
@ -206,17 +205,17 @@ class ProcessTask implements ITask {
/** /**
* Dispatches a batch of records to the record processor, and handles any fallout from that. * Dispatches a batch of records to the record processor, and handles any fallout from that.
* *
* @param getRecordsResult * @param input
* the result of the last call to Kinesis * the result of the last call to Kinesis
* @param records * @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
*/ */
private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) { private void callProcessRecords(ProcessRecordsInput input, List<Record> records) {
LOG.debug("Calling application processRecords() with " + records.size() + " records from " LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId()); + shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer) .withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); .withMillisBehindLatest(input.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
@ -339,7 +338,7 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private GetRecordsResult getRecordsResult() { private ProcessRecordsInput getRecordsResult() {
try { try {
return getRecordsResultAndRecordMillisBehindLatest(); return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) { } catch (ExpiredIteratorException e) {
@ -375,22 +374,17 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (getRecordsResult == null) { if (processRecordsInput.getMillisBehindLatest() != null) {
// Stream no longer exists
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
}
if (getRecordsResult.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
getRecordsResult.getMillisBehindLatest(), processRecordsInput.getMillisBehindLatest(),
StandardUnit.Milliseconds, StandardUnit.Milliseconds,
MetricsLevel.SUMMARY); MetricsLevel.SUMMARY);
} }
return getRecordsResult; return processRecordsInput;
} }
} }

View file

@ -0,0 +1,39 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
/**
* The Amazon Kinesis Client Library will use this to instantiate a record fetcher per shard.
* Clients may choose to create separate instantiations, or re-use instantiations.
*/
public interface RecordsFetcherFactory {
/**
* Returns a records fetcher processor to be used for processing data records for a (assigned) shard.
*
* @return Returns a record fetcher object
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy);
void setMaxSize(int maxSize);
void setMaxByteSize(int maxByteSize);
void setMaxRecordsCount(int maxRecordsCount);
void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy);
}

View file

@ -43,6 +43,7 @@ class ShardConsumer {
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final KinesisClientLibConfiguration config;
private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final ExecutorService executorService; private final ExecutorService executorService;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
@ -61,7 +62,7 @@ class ShardConsumer {
private Future<TaskResult> future; private Future<TaskResult> future;
@Getter @Getter
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
@ -91,6 +92,7 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use * @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker * @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard * @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards * @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard * @param executorService ExecutorService used to execute process tasks for this shard
@ -108,10 +110,11 @@ class ShardConsumer {
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager, parentShardPollIntervalMillis, KinesisClientLibConfiguration config) {
cleanupLeasesOfCompletedShards, executorService, metricsFactory, backoffTimeMillis, this(shardInfo, streamConfig, checkpoint,recordProcessor, leaseManager,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty(), config);
} }
/** /**
@ -126,6 +129,7 @@ class ShardConsumer {
* @param backoffTimeMillis backoff interval when we encounter exceptions * @param backoffTimeMillis backoff interval when we encounter exceptions
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
* @param config Kinesis library configuration
*/ */
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo, ShardConsumer(ShardInfo shardInfo,
@ -140,26 +144,85 @@ class ShardConsumer {
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool) { Optional<Integer> maxGetRecordsThreadPool,
this.streamConfig = streamConfig; KinesisClientLibConfiguration config) {
this.recordProcessor = recordProcessor;
this.executorService = executorService; this(
this.shardInfo = shardInfo; shardInfo,
this.checkpoint = checkpoint; streamConfig,
this.recordProcessorCheckpointer =
new RecordProcessorCheckpointer(shardInfo,
checkpoint, checkpoint,
new SequenceNumberValidator(streamConfig.getStreamProxy(), recordProcessor,
new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())),
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config
);
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream Config to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
*/
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
this.taskBackoffTimeMillis = backoffTimeMillis; this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); this.config = config;
this.dataFetcher = kinesisDataFetcher;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo));
} }
/** /**

View file

@ -46,7 +46,7 @@ class ShutdownTask implements ITask {
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN; private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsCache getRecordsCache;
/** /**
* Constructor. * Constructor.
@ -61,7 +61,7 @@ class ShutdownTask implements ITask {
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -71,7 +71,7 @@ class ShutdownTask implements ITask {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsCache = getRecordsCache;
} }
/* /*
@ -111,7 +111,7 @@ class ShutdownTask implements ITask {
} }
} }
LOG.debug("Shutting down retrieval strategy."); LOG.debug("Shutting down retrieval strategy.");
getRecordsRetrievalStrategy.shutdown(); getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
} catch (Exception e) { } catch (Exception e) {
applicationException = true; applicationException = true;

View file

@ -0,0 +1,62 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.concurrent.Executors;
import lombok.extern.apachecommons.CommonsLog;
@CommonsLog
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private final int maxRecords;
private int maxSize = 10;
private int maxByteSize = 15 * 1024 * 1024;
private int maxRecordsCount = 30000;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
}
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
}
}
@Override
public void setMaxSize(int maxSize){
this.maxSize = maxSize;
}
@Override
public void setMaxByteSize(int maxByteSize){
this.maxByteSize = maxByteSize;
}
@Override
public void setMaxRecordsCount(int maxRecordsCount) {
this.maxRecordsCount = maxRecordsCount;
}
@Override
public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}
}

View file

@ -73,6 +73,7 @@ public class Worker implements Runnable {
private final String applicationName; private final String applicationName;
private final IRecordProcessorFactory recordProcessorFactory; private final IRecordProcessorFactory recordProcessorFactory;
private final KinesisClientLibConfiguration config;
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final InitialPositionInStreamExtended initialPosition; private final InitialPositionInStreamExtended initialPosition;
private final ICheckpoint checkpointTracker; private final ICheckpoint checkpointTracker;
@ -245,6 +246,7 @@ public class Worker implements Runnable {
KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient, KinesisClientLibConfiguration config, AmazonKinesis kinesisClient, AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory, ExecutorService execService) { IMetricsFactory metricsFactory, ExecutorService execService) {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config,
new StreamConfig( new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()), .getProxy(config.getStreamName()),
@ -306,6 +308,8 @@ public class Worker implements Runnable {
* Name of the Kinesis application * Name of the Kinesis application
* @param recordProcessorFactory * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards * Used to get record processor instances for processing data from shards
* @paran config
* Kinesis Library configuration
* @param streamConfig * @param streamConfig
* Stream configuration * Stream configuration
* @param initialPositionInStream * @param initialPositionInStream
@ -333,24 +337,25 @@ public class Worker implements Runnable {
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
this(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty()); shardPrioritization, Optional.empty(), Optional.empty());
} }
/** /**
* @param applicationName * @param applicationName
* Name of the Kinesis application * Name of the Kinesis application
* @param recordProcessorFactory * @param recordProcessorFactory
* Used to get record processor instances for processing data from shards * Used to get record processor instances for processing data from shards
* @param config
* Kinesis Library Configuration
* @param streamConfig * @param streamConfig
* Stream configuration * Stream configuration
* @param initialPositionInStream * @param initialPositionInStream
@ -382,7 +387,7 @@ public class Worker implements Runnable {
*/ */
// NOTE: This has package level access solely for testing // NOTE: This has package level access solely for testing
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis, InitialPositionInStreamExtended initialPositionInStream, long parentShardPollIntervalMillis,
long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, long shardSyncIdleTimeMillis, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
@ -391,6 +396,7 @@ public class Worker implements Runnable {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) { Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.initialPosition = initialPositionInStream; this.initialPosition = initialPositionInStream;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
@ -411,7 +417,6 @@ public class Worker implements Runnable {
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
} }
/** /**
* @return the applicationName * @return the applicationName
*/ */
@ -819,11 +824,11 @@ public class Worker implements Runnable {
* *
* @param shardInfo * @param shardInfo
* Kinesis shard info * Kinesis shard info
* @param factory * @param processorFactory
* RecordProcessor factory * RecordProcessor factory
* @return ShardConsumer for the shard * @return ShardConsumer for the shard
*/ */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we // Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier // had was from an earlier
@ -832,20 +837,30 @@ public class Worker implements Runnable {
// completely processed (shutdown reason terminate). // completely processed (shutdown reason terminate).
if ((consumer == null) if ((consumer == null)
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) { || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
consumer = buildConsumer(shardInfo, factory); consumer = buildConsumer(shardInfo, processorFactory);
shardInfoShardConsumerMap.put(shardInfo, consumer); shardInfoShardConsumerMap.put(shardInfo, consumer);
wlog.infoForce("Created new shardConsumer for : " + shardInfo); wlog.infoForce("Created new shardConsumer for : " + shardInfo);
} }
return consumer; return consumer;
} }
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IRecordProcessor recordProcessor = factory.createProcessor(); IRecordProcessor recordProcessor = processorFactory.createProcessor();
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, return new ShardConsumer(shardInfo,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, streamConfig,
executorService, metricsFactory, taskBackoffTimeMillis, checkpointTracker,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); recordProcessor,
leaseCoordinator.getLeaseManager(),
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config);
} }
@ -1049,6 +1064,7 @@ public class Worker implements Runnable {
public static class Builder { public static class Builder {
private IRecordProcessorFactory recordProcessorFactory; private IRecordProcessorFactory recordProcessorFactory;
private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config; private KinesisClientLibConfiguration config;
private AmazonKinesis kinesisClient; private AmazonKinesis kinesisClient;
private AmazonDynamoDB dynamoDBClient; private AmazonDynamoDB dynamoDBClient;
@ -1244,6 +1260,7 @@ public class Worker implements Runnable {
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),
recordProcessorFactory, recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()), kinesisClient).getProxy(config.getStreamName()),
config.getMaxRecords(), config.getMaxRecords(),

View file

@ -110,7 +110,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test @Test
@Ignore @Ignore
public void testInterrupted() throws InterruptedException, ExecutionException { public void testInterrupted() throws InterruptedException, ExecutionException {
Future<GetRecordsResult> mockFuture = mock(Future.class); Future<GetRecordsResult> mockFuture = mock(Future.class);
when(completionService.submit(any())).thenReturn(mockFuture); when(completionService.submit(any())).thenReturn(mockFuture);
when(completionService.poll()).thenReturn(mockFuture); when(completionService.poll()).thenReturn(mockFuture);

View file

@ -55,6 +55,8 @@ public class ConsumerStatesTest {
@Mock @Mock
private IRecordProcessor recordProcessor; private IRecordProcessor recordProcessor;
@Mock @Mock
private KinesisClientLibConfiguration config;
@Mock
private RecordProcessorCheckpointer recordProcessorCheckpointer; private RecordProcessorCheckpointer recordProcessorCheckpointer;
@Mock @Mock
private ExecutorService executorService; private ExecutorService executorService;
@ -75,7 +77,7 @@ public class ConsumerStatesTest {
@Mock @Mock
private InitialPositionInStreamExtended initialPositionInStream; private InitialPositionInStreamExtended initialPositionInStream;
@Mock @Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
private long parentShardPollIntervalMillis = 0xCAFE; private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true; private boolean cleanupLeasesOfCompletedShards = true;
@ -98,7 +100,7 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason); when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); when(consumer.getGetRecordsCache()).thenReturn(getRecordsCache);
} }
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class; private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@ -207,6 +209,33 @@ public class ConsumerStatesTest {
} }
@Test
public void processingStateRecordsFetcher() {
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(IRecordProcessor.class, "recordProcessor", equalTo(recordProcessor)));
assertThat(task, procTask(RecordProcessorCheckpointer.class, "recordProcessorCheckpointer",
equalTo(recordProcessorCheckpointer)));
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.TERMINATE),
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
assertThat(state.shutdownTransition(ShutdownReason.REQUESTED),
equalTo(ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState()));
assertThat(state.getState(), equalTo(ShardConsumerState.PROCESSING));
assertThat(state.getTaskType(), equalTo(TaskType.PROCESS));
}
@Test @Test
public void shutdownRequestState() { public void shutdownRequestState() {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState(); ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.getConsumerState();

View file

@ -183,6 +183,12 @@ public class PrefetchGetRecordsCacheTest {
getRecordsCache.getNextResult(); getRecordsCache.getNextResult();
} }
@Test(expected = IllegalStateException.class)
public void testCallAfterShutdown() {
when(executorService.isShutdown()).thenReturn(true);
getRecordsCache.getNextResult();
}
@After @After
public void shutdown() { public void shutdown() {
getRecordsCache.shutdown(); getRecordsCache.shutdown();

View file

@ -18,8 +18,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -49,7 +48,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord; import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -77,7 +75,7 @@ public class ProcessTaskTest {
@Mock @Mock
private ThrottlingReporter throttlingReporter; private ThrottlingReporter throttlingReporter;
@Mock @Mock
private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
private List<Record> processedRecords; private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue; private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
@ -95,32 +93,39 @@ public class ProcessTaskTest {
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask( processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, shardInfo,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); config,
mockRecordProcessor,
mockCheckpointer,
mockDataFetcher,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
throttlingReporter,
getRecordsCache);
} }
@Test @Test
public void testProcessTaskWithProvisionedThroughputExceededException() { public void testProcessTaskWithProvisionedThroughputExceededException() {
// Set data fetcher to throw exception // Set data fetcher to throw exception
doReturn(false).when(mockDataFetcher).isShardEndReached(); doReturn(false).when(mockDataFetcher).isShardEndReached();
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockGetRecordsRetrievalStrategy) doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache)
.getRecords(maxRecords); .getNextResult();
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(throttlingReporter).throttled(); verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success(); verify(throttlingReporter, never()).success();
verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); verify(getRecordsCache).getNextResult();
assertTrue("Result should contain ProvisionedThroughputExceededException", assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException); result.getException() instanceof ProvisionedThroughputExceededException);
} }
@Test @Test
public void testProcessTaskWithNonExistentStream() { public void testProcessTaskWithNonExistentStream() {
// Data fetcher returns a null Result when the stream does not exist // Data fetcher returns a null Result ` the stream does not exist
doReturn(null).when(mockGetRecordsRetrievalStrategy).getRecords(maxRecords); doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult();
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(mockGetRecordsRetrievalStrategy).getRecords(eq(maxRecords)); verify(getRecordsCache).getNextResult();
assertNull("Task should not throw an exception", result.getException()); assertNull("Task should not throw an exception", result.getException());
} }
@ -304,14 +309,13 @@ public class ProcessTaskTest {
private void testWithRecords(List<Record> records, private void testWithRecords(List<Record> records,
ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber largestPermittedCheckpointValue) { ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(mockGetRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn( when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50));
new GetRecordsResult().withRecords(records));
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call(); processTask.call();
verify(throttlingReporter).success(); verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled(); verify(throttlingReporter, never()).throttled();
verify(mockGetRecordsRetrievalStrategy).getRecords(anyInt()); verify(getRecordsCache).getNextResult();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture()); verify(mockRecordProcessor).processRecords(priCaptor.capture());
processedRecords = priCaptor.getValue().getRecords(); processedRecords = priCaptor.getValue().getRecords();

View file

@ -0,0 +1,41 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
public class RecordsFetcherFactoryTest {
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
recordsFetcherFactory = new SimpleRecordsFetcherFactory(1);
}
@Test
public void createDefaultRecordsFetcherTest() {
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
}
@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy);
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
}
}

View file

@ -20,9 +20,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
@ -36,6 +36,7 @@ import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
@ -47,11 +48,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -95,10 +98,16 @@ public class ShardConsumerTest {
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied. // ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1); private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final int maxRecords = 500;
private RecordsFetcherFactory recordsFetcherFactory;
private GetRecordsCache getRecordsCache;
@Mock @Mock
private IRecordProcessor processor; private IRecordProcessor processor;
@Mock @Mock
private KinesisClientLibConfiguration config;
@Mock
private IKinesisProxy streamProxy; private IKinesisProxy streamProxy;
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private ILeaseManager<KinesisClientLease> leaseManager;
@ -107,6 +116,14 @@ public class ShardConsumerTest {
@Mock @Mock
private ShutdownNotification shutdownNotification; private ShutdownNotification shutdownNotification;
@Before
public void setup() {
getRecordsCache = null;
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
}
/** /**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/ */
@ -137,7 +154,8 @@ public class ShardConsumerTest {
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -154,7 +172,6 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
} }
/** /**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/ */
@ -185,7 +202,8 @@ public class ShardConsumerTest {
spyExecutorService, spyExecutorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -226,7 +244,8 @@ public class ShardConsumerTest {
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
@ -299,7 +318,6 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig = StreamConfig streamConfig =
@ -310,18 +328,39 @@ public class ShardConsumerTest {
skipCheckpointValidationValue, INITIAL_POSITION_LATEST); skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =
new ShardConsumer(shardInfo, new ShardConsumer(shardInfo,
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
recordProcessorCheckpointer,
leaseManager, leaseManager,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -330,6 +369,7 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS); processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls // We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) { for (int i = 0; i < numRecs;) {
@ -343,6 +383,8 @@ public class ShardConsumerTest {
Thread.sleep(50L); Thread.sleep(50L);
} }
verify(getRecordsCache, times(5)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.notifyShutdownRequested(shutdownNotification); consumer.notifyShutdownRequested(shutdownNotification);
consumer.consumeShard(); consumer.consumeShard();
@ -366,6 +408,8 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
verify(getRecordsCache).shutdown();
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS); executorService.awaitTermination(60, TimeUnit.SECONDS);
@ -401,7 +445,6 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig = StreamConfig streamConfig =
@ -413,18 +456,39 @@ public class ShardConsumerTest {
atTimestamp); atTimestamp);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =
new ShardConsumer(shardInfo, new ShardConsumer(shardInfo,
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
recordProcessorCheckpointer,
leaseManager, leaseManager,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -434,6 +498,8 @@ public class ShardConsumerTest {
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls // We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) { for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard(); boolean newTaskSubmitted = consumer.consumeShard();
@ -446,6 +512,8 @@ public class ShardConsumerTest {
Thread.sleep(50L); Thread.sleep(50L);
} }
verify(getRecordsCache, times(4)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown(); consumer.beginShutdown();
Thread.sleep(50L); Thread.sleep(50L);
@ -457,8 +525,11 @@ public class ShardConsumerTest {
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS); executorService.awaitTermination(60, TimeUnit.SECONDS);
verify(getRecordsCache).shutdown();
String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); String iterator = fileBasedProxy.getIterator(streamShardId, timestamp);
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
assertEquals(4, processor.getProcessedRecords().size()); assertEquals(4, processor.getProcessedRecords().size());
file.delete(); file.delete();
@ -486,11 +557,15 @@ public class ShardConsumerTest {
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache());
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
when(checkpoint.getCheckpointObject(anyString())).thenReturn( when(checkpoint.getCheckpointObject(anyString())).thenReturn(
new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber));
@ -535,9 +610,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(), Optional.empty(),
Optional.empty()); Optional.empty(),
config);
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class);
} }
@Test @Test
@ -563,9 +640,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1), Optional.of(1),
Optional.of(2)); Optional.of(2),
config);
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
} }
//@formatter:off (gets the formatting wrong) //@formatter:off (gets the formatting wrong)

View file

@ -59,7 +59,7 @@ public class ShutdownTaskTest {
IRecordProcessor defaultRecordProcessor = new TestStreamlet(); IRecordProcessor defaultRecordProcessor = new TestStreamlet();
@Mock @Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -80,7 +80,7 @@ public class ShutdownTaskTest {
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(getRecordsRetrievalStrategy).shutdown(); doNothing().when(getRecordsCache).shutdown();
} }
/** /**
@ -109,7 +109,7 @@ public class ShutdownTaskTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy); getRecordsCache);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -135,11 +135,11 @@ public class ShutdownTaskTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy); getRecordsCache);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsRetrievalStrategy).shutdown(); verify(getRecordsCache).shutdown();
} }
/** /**
@ -147,7 +147,7 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testGetTaskType() { public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }

View file

@ -60,6 +60,7 @@ import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Matchers; import org.mockito.Matchers;
@ -130,9 +131,13 @@ public class WorkerTest {
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
private RecordsFetcherFactory recordsFetcherFactory;
@Mock @Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator; private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock @Mock
private KinesisClientLibConfiguration config;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager; private ILeaseManager<KinesisClientLease> leaseManager;
@Mock @Mock
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory;
@ -155,6 +160,12 @@ public class WorkerTest {
@Mock @Mock
private TaskResult taskResult; private TaskResult taskResult;
@Before
public void setup() {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
}
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
@ -210,6 +221,8 @@ public class WorkerTest {
public final void testCreateOrGetShardConsumer() { public final void testCreateOrGetShardConsumer() {
final String stageName = "testStageName"; final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null; IKinesisProxy proxy = null;
ICheckpoint checkpoint = null; ICheckpoint checkpoint = null;
int maxRecords = 1; int maxRecords = 1;
@ -228,7 +241,9 @@ public class WorkerTest {
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST, streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
shardSyncIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
@ -257,6 +272,8 @@ public class WorkerTest {
public void testWorkerLoopWithCheckpoint() { public void testWorkerLoopWithCheckpoint() {
final String stageName = "testStageName"; final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null; IKinesisProxy proxy = null;
ICheckpoint checkpoint = null; ICheckpoint checkpoint = null;
int maxRecords = 1; int maxRecords = 1;
@ -275,7 +292,7 @@ public class WorkerTest {
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint)
.thenReturn(secondCheckpoint); .thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
@ -314,6 +331,8 @@ public class WorkerTest {
public final void testCleanupShardConsumers() { public final void testCleanupShardConsumers() {
final String stageName = "testStageName"; final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null; IKinesisProxy proxy = null;
ICheckpoint checkpoint = null; ICheckpoint checkpoint = null;
int maxRecords = 1; int maxRecords = 1;
@ -332,7 +351,9 @@ public class WorkerTest {
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
streamletFactory, streamConfig, INITIAL_POSITION_LATEST, streamletFactory,
clientConfig,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
shardSyncIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, cleanupLeasesUponShardCompletion,
@ -371,6 +392,8 @@ public class WorkerTest {
public final void testInitializationFailureWithRetries() { public final void testInitializationFailureWithRetries() {
String stageName = "testInitializationWorker"; String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
int count = 0; int count = 0;
when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2; int maxRecords = 2;
@ -386,6 +409,7 @@ public class WorkerTest {
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
recordProcessorFactory, recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON, streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval, shardPollInterval,
shardSyncIntervalMillis, shardSyncIntervalMillis,
@ -709,6 +733,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -742,7 +768,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -785,6 +811,8 @@ public class WorkerTest {
public void testShutdownCallableNotAllowedTwice() throws Exception { public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -816,7 +844,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -850,6 +878,8 @@ public class WorkerTest {
public void testGracefulShutdownSingleFuture() throws Exception { public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -888,7 +918,7 @@ public class WorkerTest {
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) {
@ -926,6 +956,8 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -950,7 +982,7 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -988,6 +1020,8 @@ public class WorkerTest {
public void testRequestShutdownWithLostLease() throws Exception { public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1020,7 +1054,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1089,6 +1123,8 @@ public class WorkerTest {
public void testRequestShutdownWithAllLeasesLost() throws Exception { public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1121,7 +1157,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1195,6 +1231,8 @@ public class WorkerTest {
public void testLeaseCancelledAfterShutdownRequest() throws Exception { public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1226,7 +1264,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1267,6 +1305,8 @@ public class WorkerTest {
public void testEndOfShardAfterShutdownRequest() throws Exception { public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class); StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1298,7 +1338,7 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
@ -1336,13 +1376,14 @@ public class WorkerTest {
private abstract class InjectableWorker extends Worker { private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream, KinesisClientLibConfiguration config, StreamConfig streamConfig,
InitialPositionInStreamExtended initialPositionInStream,
long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis, long parentShardPollIntervalMillis, long shardSyncIdleTimeMillis,
boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint, boolean cleanupLeasesUponShardCompletion, ICheckpoint checkpoint,
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, streamConfig, initialPositionInStream, super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization);
@ -1649,10 +1690,12 @@ public class WorkerTest {
idleTimeInMilliseconds, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
recordProcessorFactory, recordProcessorFactory,
clientConfig,
streamConfig, INITIAL_POSITION_TRIM_HORIZON, streamConfig, INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
shardSyncIntervalMillis, shardSyncIntervalMillis,