Merge remote-tracking branch 'upstream/prefetch' into prefetch

This commit is contained in:
Wei 2017-09-27 10:43:09 -07:00
commit 50ed982255
29 changed files with 1134 additions and 364 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.3 Bundle-Version: 1.8.5
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -29,6 +29,18 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes ## Release Notes
### Release 1.8.5 (September 26, 2017)
* Only advance the shard iterator for the accepted response.
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
* [PR #230](https://github.com/awslabs/amazon-kinesis-client/pull/230)
* [Issue #231](https://github.com/awslabs/amazon-kinesis-client/issues/231)
### Release 1.8.4 (September 22, 2017)
* Create a new completion service for each request.
This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.
* [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227)
* [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226)
### Release 1.8.3 (September 22, 2017) ### Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown * Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used. This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.8.3</version> <version>1.8.6-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService; private final ExecutorService executorService;
private final int retryGetRecordsInSeconds; private final int retryGetRecordsInSeconds;
private final String shardId; private final String shardId;
final CompletionService<GetRecordsResult> completionService; final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService), this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId); shardId);
} }
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, CompletionService<GetRecordsResult> completionService, String shardId) { int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.executorService = executorService; this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.completionService = completionService; this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId; this.shardId = shardId;
} }
@ -75,8 +78,9 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown"); throw new IllegalStateException("Strategy has been shutdown");
} }
GetRecordsResult result = null; GetRecordsResult result = null;
Set<Future<GetRecordsResult>> futures = new HashSet<>(); CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords); Set<Future<DataFetcherResult>> futures = new HashSet<>();
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) { while (true) {
try { try {
futures.add(completionService.submit(retrieverCall)); futures.add(completionService.submit(retrieverCall));
@ -85,10 +89,15 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
} }
try { try {
Future<GetRecordsResult> resultFuture = completionService.poll(retryGetRecordsInSeconds, Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
TimeUnit.SECONDS); TimeUnit.SECONDS);
if (resultFuture != null) { if (resultFuture != null) {
result = resultFuture.get(); //
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
// to the caller. This ensures that the shard iterator is consistently advance in step with
// what the caller sees.
//
result = resultFuture.get().accept();
break; break;
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -98,17 +107,11 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
break; break;
} }
} }
futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> { futures.forEach(f -> f.cancel(true));
try {
completionService.take();
} catch (InterruptedException e) {
log.error("Exception thrown while trying to empty the threadpool.");
}
});
return result; return result;
} }
private Callable<GetRecordsResult> createRetrieverCallable(int maxRecords) { private Callable<DataFetcherResult> createRetrieverCallable(int maxRecords) {
ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope()); ThreadSafeMetricsDelegatingScope metricsScope = new ThreadSafeMetricsDelegatingScope(MetricsHelper.getMetricsScope());
return () -> { return () -> {
try { try {

View file

@ -50,10 +50,13 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
return processRecordsInput; return processRecordsInput;
} }
@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
}
@Override @Override
public void shutdown() { public void shutdown() {
// getRecordsRetrievalStrategy.shutdown();
// Nothing to do here.
//
} }
} }

View file

@ -251,9 +251,14 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), return new InitializeTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getRecordProcessor(),
consumer.getTaskBackoffTimeMillis(), consumer.getStreamConfig()); consumer.getCheckpoint(),
consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.getStreamConfig(),
consumer.getGetRecordsCache());
} }
@Override @Override
@ -307,10 +312,14 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), return new ProcessTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getStreamConfig(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getRecordProcessor(),
consumer.getGetRecordsRetrievalStrategy()); consumer.getRecordProcessorCheckpointer(),
consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getGetRecordsCache());
} }
@Override @Override
@ -369,8 +378,10 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), return new ShutdownNotificationTask(consumer.getRecordProcessor(),
consumer.getShutdownNotification(), consumer.getShardInfo()); consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification(),
consumer.getShardInfo());
} }
@Override @Override
@ -509,13 +520,16 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), return new ShutdownTask(consumer.getShardInfo(),
consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownReason(),
consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(), consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), consumer.isCleanupLeasesOfCompletedShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsRetrievalStrategy()); consumer.getGetRecordsCache());
} }
@Override @Override

View file

@ -0,0 +1,37 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Amazon Software License
* (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
* http://aws.amazon.com/asl/ or in the "license" file accompanying this file. This file is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
/**
* Represents the result from the DataFetcher, and allows the receiver to accept a result
*/
public interface DataFetcherResult {
/**
* The result of the request to Kinesis
*
* @return The result of the request, this can be null if the request failed.
*/
GetRecordsResult getResult();
/**
* Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any
* further progress can be made.
*
* @return the result of the request, this can be null if the request failed.
*/
GetRecordsResult accept();
/**
* Indicates whether this result is at the end of the shard or not
*
* @return true if the result is at the end of a shard, false otherwise
*/
boolean isShardEnd();
}

View file

@ -34,6 +34,8 @@ public interface GetRecordsCache {
*/ */
ProcessRecordsInput getNextResult(); ProcessRecordsInput getNextResult();
GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy();
/** /**
* This method calls the shutdown behavior on the cache, if available. * This method calls the shutdown behavior on the cache, if available.
*/ */

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -43,6 +43,7 @@ class InitializeTask implements ITask {
// Back off for this interval if we encounter a problem (exception) // Back off for this interval if we encounter a problem (exception)
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final StreamConfig streamConfig; private final StreamConfig streamConfig;
private final GetRecordsCache getRecordsCache;
/** /**
* Constructor. * Constructor.
@ -53,7 +54,8 @@ class InitializeTask implements ITask {
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, long backoffTimeMillis,
StreamConfig streamConfig) { StreamConfig streamConfig,
GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint; this.checkpoint = checkpoint;
@ -61,6 +63,7 @@ class InitializeTask implements ITask {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.getRecordsCache = getRecordsCache;
} }
/* /*
@ -80,6 +83,7 @@ class InitializeTask implements ITask {
ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint(); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.getCheckpoint();
dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream()); dataFetcher.initialize(initialCheckpoint.getSequenceNumber(), streamConfig.getInitialPositionInStream());
getRecordsCache.start();
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint); recordProcessorCheckpointer.setInitialCheckpointValue(initialCheckpoint);

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.5";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
@ -182,6 +182,11 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
/**
* The amount of time to sleep in between 2 get calls from the data fetcher.
*/
public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -235,6 +240,9 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private RecordsFetcherFactory recordsFetcherFactory; private RecordsFetcherFactory recordsFetcherFactory;
@Getter
private long idleMillisBetweenCalls;
/** /**
* Constructor. * Constructor.
* *
@ -270,15 +278,32 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) { String workerId) {
this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, this(applicationName,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, streamName,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, null,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, null,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, DEFAULT_INITIAL_POSITION_IN_STREAM,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), kinesisCredentialsProvider,
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, dynamoDBCredentialsProvider,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, cloudWatchCredentialsProvider,
DEFAULT_SHUTDOWN_GRACE_MILLIS); DEFAULT_FAILOVER_TIME_MILLIS,
workerId,
DEFAULT_MAX_RECORDS,
DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS,
DEFAULT_METRICS_BUFFER_TIME_MILLIS,
DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
DEFAULT_SHUTDOWN_GRACE_MILLIS,
DEFAULT_IDLE_MILLIS_BETWEEN_CALLS);
} }
/** /**
@ -340,7 +365,8 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis,
long idleMillisBetweenCalls) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis, maxRecords, idleTimeBetweenReadsInMillis,
@ -348,7 +374,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls);
} }
/** /**
@ -411,7 +437,8 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName, String regionName,
long shutdownGraceMillis) { long shutdownGraceMillis,
long idleMillisBetweenCalls) {
// Check following values are greater than zero // Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -422,6 +449,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
checkIsRegionNameValid(regionName); checkIsRegionNameValid(regionName);
this.applicationName = applicationName; this.applicationName = applicationName;
this.tableName = applicationName; this.tableName = applicationName;
@ -459,6 +487,7 @@ public class KinesisClientLibConfiguration {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
} }
/** /**
@ -1290,7 +1319,13 @@ public class KinesisClientLibConfiguration {
} }
public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy)); switch (dataFetchingStrategy.toUpperCase()) {
case "PREFETCH_CACHED":
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
break;
default:
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.DEFAULT);
}
return this; return this;
} }
@ -1316,4 +1351,14 @@ public class KinesisClientLibConfiguration {
this.shutdownGraceMillis = shutdownGraceMillis; this.shutdownGraceMillis = shutdownGraceMillis;
return this; return this;
} }
/**
* @param idleMillisBetweenCalls Idle time between 2 getcalls from the data fetcher.
* @return
*/
public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) {
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
return this;
}
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -14,6 +14,7 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Instant;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -28,6 +29,8 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.ShardIteratorType;
import lombok.Data;
/** /**
* Used to get data from Amazon Kinesis. Tracks iterator state internally. * Used to get data from Amazon Kinesis. Tracks iterator state internally.
*/ */
@ -40,16 +43,18 @@ class KinesisDataFetcher {
private final String shardId; private final String shardId;
private boolean isShardEndReached; private boolean isShardEndReached;
private boolean isInitialized; private boolean isInitialized;
private Instant lastResponseTime;
private long idleMillisBetweenCalls;
/** /**
* *
* @param kinesisProxy Kinesis proxy * @param kinesisProxy Kinesis proxy
* @param shardInfo The shardInfo object. * @param shardInfo The shardInfo object.
*/ */
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) {
this.shardId = shardInfo.getShardId(); this.shardId = shardInfo.getShardId();
this.kinesisProxy = this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId);
new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls();
} }
/** /**
@ -58,32 +63,65 @@ class KinesisDataFetcher {
* @param maxRecords Max records to fetch * @param maxRecords Max records to fetch
* @return list of records of up to maxRecords size * @return list of records of up to maxRecords size
*/ */
public GetRecordsResult getRecords(int maxRecords) { public DataFetcherResult getRecords(int maxRecords) {
if (!isInitialized) { if (!isInitialized) {
throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization."); throw new IllegalArgumentException("KinesisDataFetcher.getRecords called before initialization.");
} }
GetRecordsResult response = null;
if (nextIterator != null) { if (nextIterator != null) {
try { try {
response = kinesisProxy.get(nextIterator, maxRecords); return new AdvancingResult(kinesisProxy.get(nextIterator, maxRecords));
nextIterator = response.getNextShardIterator();
} catch (ResourceNotFoundException e) { } catch (ResourceNotFoundException e) {
LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId); LOG.info("Caught ResourceNotFoundException when fetching records for shard " + shardId);
nextIterator = null; return TERMINAL_RESULT;
} }
} else {
return TERMINAL_RESULT;
}
}
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
@Override
public GetRecordsResult getResult() {
return new GetRecordsResult().withMillisBehindLatest(null).withRecords(Collections.emptyList())
.withNextShardIterator(null);
}
@Override
public GetRecordsResult accept() {
isShardEndReached = true;
return getResult();
}
@Override
public boolean isShardEnd() {
return isShardEndReached;
}
};
@Data
class AdvancingResult implements DataFetcherResult {
final GetRecordsResult result;
@Override
public GetRecordsResult getResult() {
return result;
}
@Override
public GetRecordsResult accept() {
nextIterator = result.getNextShardIterator();
if (nextIterator == null) { if (nextIterator == null) {
isShardEndReached = true; isShardEndReached = true;
} }
} else { return getResult();
isShardEndReached = true;
} }
if (response == null) { @Override
response = new GetRecordsResult().withRecords(Collections.emptyList()); public boolean isShardEnd() {
return isShardEndReached;
} }
return response;
} }
/** /**

View file

@ -63,6 +63,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override @Override
public void start() { public void start() {
if (executorService.isShutdown()) {
throw new IllegalStateException("ExecutorService has been shutdown.");
}
if (!started) { if (!started) {
log.info("Starting prefetching thread."); log.info("Starting prefetching thread.");
executorService.execute(new DefaultGetRecordsCacheDaemon()); executorService.execute(new DefaultGetRecordsCacheDaemon());
@ -72,8 +76,12 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override @Override
public ProcessRecordsInput getNextResult() { public ProcessRecordsInput getNextResult() {
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}
if (!started) { if (!started) {
throw new IllegalStateException("Threadpool in the cache was not started, make sure to call start on the cache"); throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
} }
ProcessRecordsInput result = null; ProcessRecordsInput result = null;
try { try {
@ -85,9 +93,16 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
return result; return result;
} }
@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
}
@Override @Override
public void shutdown() { public void shutdown() {
getRecordsRetrievalStrategy.shutdown();
executorService.shutdownNow(); executorService.shutdownNow();
started = false;
} }
private class DefaultGetRecordsCacheDaemon implements Runnable { private class DefaultGetRecordsCacheDaemon implements Runnable {
@ -108,7 +123,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
getRecordsResultQueue.put(processRecordsInput); getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput); prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache", e); log.info("Thread was interrupted, indicating shutdown was called on the cache");
} }
} }
} }

View file

@ -18,7 +18,6 @@ import java.math.BigInteger;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Optional;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -63,7 +62,7 @@ class ProcessTask implements ITask {
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter; private final ThrottlingReporter throttlingReporter;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsCache getRecordsCache;
/** /**
* @param shardInfo * @param shardInfo
@ -78,17 +77,17 @@ class ProcessTask implements ITask {
* Kinesis data fetcher (used to fetch records from Kinesis) * Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis * @param backoffTimeMillis
* backoff time when catching exceptions * backoff time when catching exceptions
* @param getRecordsRetrievalStrategy * @param getRecordsCache
* The retrieval strategy for fetching records from kinesis * The retrieval strategy for fetching records from kinesis
*/ */
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { GetRecordsCache getRecordsCache) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
getRecordsRetrievalStrategy); getRecordsCache);
} }
/** /**
@ -110,7 +109,7 @@ class ProcessTask implements ITask {
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -120,7 +119,7 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter; this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsCache = getRecordsCache;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -158,9 +157,9 @@ class ProcessTask implements ITask {
return new TaskResult(null, true); return new TaskResult(null, true);
} }
final GetRecordsResult getRecordsResult = getRecordsResult(); final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success(); throttlingReporter.success();
List<Record> records = getRecordsResult.getRecords(); List<Record> records = processRecordsInput.getRecords();
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
@ -175,7 +174,7 @@ class ProcessTask implements ITask {
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
callProcessRecords(getRecordsResult, records); callProcessRecords(processRecordsInput, records);
} }
} catch (ProvisionedThroughputExceededException pte) { } catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled(); throttlingReporter.throttled();
@ -206,17 +205,17 @@ class ProcessTask implements ITask {
/** /**
* Dispatches a batch of records to the record processor, and handles any fallout from that. * Dispatches a batch of records to the record processor, and handles any fallout from that.
* *
* @param getRecordsResult * @param input
* the result of the last call to Kinesis * the result of the last call to Kinesis
* @param records * @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation. * the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
*/ */
private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) { private void callProcessRecords(ProcessRecordsInput input, List<Record> records) {
LOG.debug("Calling application processRecords() with " + records.size() + " records from " LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId()); + shardInfo.getShardId());
final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records) final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer) .withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); .withMillisBehindLatest(input.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
@ -339,7 +338,7 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private GetRecordsResult getRecordsResult() { private ProcessRecordsInput getRecordsResult() {
try { try {
return getRecordsResultAndRecordMillisBehindLatest(); return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) { } catch (ExpiredIteratorException e) {
@ -375,22 +374,17 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() { private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords()); final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult();
if (getRecordsResult == null) { if (processRecordsInput.getMillisBehindLatest() != null) {
// Stream no longer exists
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
}
if (getRecordsResult.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
getRecordsResult.getMillisBehindLatest(), processRecordsInput.getMillisBehindLatest(),
StandardUnit.Milliseconds, StandardUnit.Milliseconds,
MetricsLevel.SUMMARY); MetricsLevel.SUMMARY);
} }
return getRecordsResult; return processRecordsInput;
} }
} }

View file

@ -60,9 +60,9 @@ class ShardConsumer {
private ITask currentTask; private ITask currentTask;
private long currentTaskSubmitTime; private long currentTaskSubmitTime;
private Future<TaskResult> future; private Future<TaskResult> future;
@Getter
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Getter
private final GetRecordsCache getRecordsCache;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
@ -104,17 +104,28 @@ class ShardConsumer {
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
this(shardInfo, streamConfig, checkpoint,recordProcessor, config, leaseManager, KinesisClientLibConfiguration config) {
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, this(shardInfo,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty()); streamConfig,
checkpoint,
recordProcessor,
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional.empty(),
Optional.empty(),
config);
} }
/** /**
@ -122,7 +133,6 @@ class ShardConsumer {
* @param streamConfig Stream configuration to use * @param streamConfig Stream configuration to use
* @param checkpoint Checkpoint tracker * @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard * @param recordProcessor Record processor used to process the data records for the shard
* @param config Kinesis library configuration
* @param leaseManager Used to create leases for new shards * @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param executorService ExecutorService used to execute process tasks for this shard * @param executorService ExecutorService used to execute process tasks for this shard
@ -130,13 +140,13 @@ class ShardConsumer {
* @param backoffTimeMillis backoff interval when we encounter exceptions * @param backoffTimeMillis backoff interval when we encounter exceptions
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record. * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool. * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool.
* @param config Kinesis library configuration
*/ */
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShardConsumer(ShardInfo shardInfo, ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig, StreamConfig streamConfig,
ICheckpoint checkpoint, ICheckpoint checkpoint,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
KinesisClientLibConfiguration config,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis, long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
@ -145,27 +155,85 @@ class ShardConsumer {
long backoffTimeMillis, long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool) { Optional<Integer> maxGetRecordsThreadPool,
this.streamConfig = streamConfig; KinesisClientLibConfiguration config) {
this.recordProcessor = recordProcessor;
this.config = config; this(
this.executorService = executorService; shardInfo,
this.shardInfo = shardInfo; streamConfig,
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer =
new RecordProcessorCheckpointer(shardInfo,
checkpoint, checkpoint,
new SequenceNumberValidator(streamConfig.getStreamProxy(), recordProcessor,
new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(), shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())); streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())),
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config
);
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream Config to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseManager Used to create leases for new shards
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
*/
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
this.taskBackoffTimeMillis = backoffTimeMillis; this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo); this.config = config;
this.dataFetcher = kinesisDataFetcher;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo));
} }
/** /**

View file

@ -46,7 +46,7 @@ class ShutdownTask implements ITask {
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN; private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final GetRecordsCache getRecordsCache;
/** /**
* Constructor. * Constructor.
@ -61,7 +61,7 @@ class ShutdownTask implements ITask {
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis, long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -71,7 +71,7 @@ class ShutdownTask implements ITask {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.getRecordsCache = getRecordsCache;
} }
/* /*
@ -111,7 +111,7 @@ class ShutdownTask implements ITask {
} }
} }
LOG.debug("Shutting down retrieval strategy."); LOG.debug("Shutting down retrieval strategy.");
getRecordsRetrievalStrategy.shutdown(); getRecordsCache.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
} catch (Exception e) { } catch (Exception e) {
applicationException = true; applicationException = true;

View file

@ -14,16 +14,15 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import lombok.Setter;
import lombok.extern.apachecommons.CommonsLog;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import lombok.extern.apachecommons.CommonsLog;
@CommonsLog @CommonsLog
public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private final int maxRecords; private final int maxRecords;
private int maxSize = 10; private int maxSize = 3;
private int maxByteSize = 15 * 1024 * 1024; private int maxByteSize = 8 * 1024 * 1024;
private int maxRecordsCount = 30000; private int maxRecordsCount = 30000;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;

View file

@ -28,7 +28,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
@Override @Override
public GetRecordsResult getRecords(final int maxRecords) { public GetRecordsResult getRecords(final int maxRecords) {
return dataFetcher.getRecords(maxRecords); return dataFetcher.getRecords(maxRecords).accept();
} }
@Override @Override

View file

@ -847,10 +847,20 @@ public class Worker implements Runnable {
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IRecordProcessor recordProcessor = processorFactory.createProcessor(); IRecordProcessor recordProcessor = processorFactory.createProcessor();
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, config, return new ShardConsumer(shardInfo,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, streamConfig,
executorService, metricsFactory, taskBackoffTimeMillis, checkpointTracker,
skipShardSyncAtWorkerInitializationIfLeasesExist, retryGetRecordsInSeconds, maxGetRecordsThreadPool); recordProcessor,
leaseCoordinator.getLeaseManager(),
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
executorService,
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config);
} }

View file

@ -27,14 +27,19 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -56,21 +61,29 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock @Mock
private IKinesisProxy mockKinesisProxy; private IKinesisProxy mockKinesisProxy;
@Mock @Mock
private ShardInfo mockShardInfo; private ShardInfo mockShardInfo;
@Mock
private KinesisClientLibConfiguration configuration;
@Mock
private Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
@Mock
private DataFetcherResult result;
@Mock
private GetRecordsResult recordsResult;
private CompletionService<DataFetcherResult> completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy; private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher; private KinesisDataFetcher dataFetcher;
private GetRecordsResult result;
private ExecutorService executorService; private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler; private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10; private int numberOfRecords = 10;
private CompletionService<GetRecordsResult> completionService;
@Before @Before
public void setup() { public void setup() {
dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo)); dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo, configuration));
rejectedExecutionHandler = spy(new ThreadPoolExecutor.AbortPolicy()); rejectedExecutionHandler = spy(new ThreadPoolExecutor.AbortPolicy());
executorService = spy(new ThreadPoolExecutor( executorService = spy(new ThreadPoolExecutor(
CORE_POOL_SIZE, CORE_POOL_SIZE,
@ -80,13 +93,15 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1), new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler)); rejectedExecutionHandler));
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001"); completionService = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
completionService = spy(getRecordsRetrivalStrategy.completionService); when(completionServiceSupplier.get()).thenReturn(completionService);
result = null; getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
when(result.accept()).thenReturn(recordsResult);
} }
@Test @Test
public void oneRequestMultithreadTest() { public void oneRequestMultithreadTest() {
when(result.accept()).thenReturn(null);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords)); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords));
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
@ -95,23 +110,24 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test @Test
public void multiRequestTest() { public void multiRequestTest() {
result = mock(GetRecordsResult.class); ExecutorCompletionService<DataFetcherResult> completionService1 = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult); assertThat(getRecordsResult, equalTo(recordsResult));
result = null; when(result.accept()).thenReturn(null);
ExecutorCompletionService<DataFetcherResult> completionService2 = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult); assertThat(getRecordsResult, nullValue(GetRecordsResult.class));
} }
@Test @Test
@Ignore @Ignore
public void testInterrupted() throws InterruptedException, ExecutionException { public void testInterrupted() throws InterruptedException, ExecutionException {
Future<DataFetcherResult> mockFuture = mock(Future.class);
Future<GetRecordsResult> mockFuture = mock(Future.class);
when(completionService.submit(any())).thenReturn(mockFuture); when(completionService.submit(any())).thenReturn(mockFuture);
when(completionService.poll()).thenReturn(mockFuture); when(completionService.poll()).thenReturn(mockFuture);
doThrow(InterruptedException.class).when(mockFuture).get(); doThrow(InterruptedException.class).when(mockFuture).get();
@ -138,12 +154,13 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
} }
private class KinesisDataFetcherForTests extends KinesisDataFetcher { private class KinesisDataFetcherForTests extends KinesisDataFetcher {
public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) { public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo,
super(kinesisProxy, shardInfo); final KinesisClientLibConfiguration configuration) {
super(kinesisProxy, shardInfo, configuration);
} }
@Override @Override
public GetRecordsResult getRecords(final int maxRecords) { public DataFetcherResult getRecords(final int maxRecords) {
try { try {
Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000); Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View file

@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -51,23 +53,34 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock @Mock
private ExecutorService executorService; private ExecutorService executorService;
@Mock @Mock
private CompletionService<GetRecordsResult> completionService; private Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
@Mock @Mock
private Future<GetRecordsResult> successfulFuture; private CompletionService<DataFetcherResult> completionService;
@Mock @Mock
private Future<GetRecordsResult> blockedFuture; private Future<DataFetcherResult> successfulFuture;
@Mock
private Future<DataFetcherResult> blockedFuture;
@Mock
private DataFetcherResult dataFetcherResult;
@Mock @Mock
private GetRecordsResult expectedResults; private GetRecordsResult expectedResults;
@Before
public void before() {
when(completionServiceSupplier.get()).thenReturn(completionService);
when(dataFetcherResult.getResult()).thenReturn(expectedResults);
when(dataFetcherResult.accept()).thenReturn(expectedResults);
}
@Test @Test
public void testSingleSuccessfulRequestFuture() throws Exception { public void testSingleSuccessfulRequestFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture); when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults); when(successfulFuture.get()).thenReturn(dataFetcherResult);
GetRecordsResult result = strategy.getRecords(10); GetRecordsResult result = strategy.getRecords(10);
@ -76,8 +89,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).get(); verify(successfulFuture).get();
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(completionService, never()).take();
assertThat(result, equalTo(expectedResults)); assertThat(result, equalTo(expectedResults));
} }
@ -85,12 +96,12 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test @Test
public void testBlockedAndSuccessfulFuture() throws Exception { public void testBlockedAndSuccessfulFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture); when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults); when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false); when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true); when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false); when(successfulFuture.isCancelled()).thenReturn(false);
@ -104,9 +115,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(blockedFuture, never()).get(); verify(blockedFuture, never()).get();
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResults, equalTo(expectedResults)); assertThat(actualResults, equalTo(expectedResults));
} }
@ -114,7 +122,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testStrategyIsShutdown() throws Exception { public void testStrategyIsShutdown() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(true); when(executorService.isShutdown()).thenReturn(true);
@ -124,12 +132,12 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test @Test
public void testPoolOutOfResources() throws Exception { public void testPoolOutOfResources() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture); when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(expectedResults); when(successfulFuture.get()).thenReturn(dataFetcherResult);
when(successfulFuture.cancel(anyBoolean())).thenReturn(false); when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true); when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
when(successfulFuture.isCancelled()).thenReturn(false); when(successfulFuture.isCancelled()).thenReturn(false);
@ -141,9 +149,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResult, equalTo(expectedResults)); assertThat(actualResult, equalTo(expectedResults));
} }

View file

@ -77,7 +77,7 @@ public class ConsumerStatesTest {
@Mock @Mock
private InitialPositionInStreamExtended initialPositionInStream; private InitialPositionInStreamExtended initialPositionInStream;
@Mock @Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
private long parentShardPollIntervalMillis = 0xCAFE; private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true; private boolean cleanupLeasesOfCompletedShards = true;
@ -100,7 +100,7 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason); when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy); when(consumer.getGetRecordsCache()).thenReturn(getRecordsCache);
} }
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class; private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;

View file

@ -85,6 +85,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG,
TEST_VALUE_LONG); TEST_VALUE_LONG);
} }
@ -95,7 +96,8 @@ public class KinesisClientLibConfigurationTest {
// Try each argument at one time. // Try each argument at one time.
KinesisClientLibConfiguration config = null; KinesisClientLibConfiguration config = null;
long[] longValues = long[] longValues =
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG }; { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_LONG, TEST_VALUE_LONG };
for (int i = 0; i < PARAMETER_COUNT; i++) { for (int i = 0; i < PARAMETER_COUNT; i++) {
longValues[i] = INVALID_LONG; longValues[i] = INVALID_LONG;
try { try {
@ -124,7 +126,8 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT, TEST_VALUE_INT,
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
longValues[6]); longValues[6],
longValues[7]);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
@ -159,6 +162,7 @@ public class KinesisClientLibConfigurationTest {
intValues[1], intValues[1],
skipCheckpointValidationValue, skipCheckpointValidationValue,
null, null,
TEST_VALUE_LONG,
TEST_VALUE_LONG); TEST_VALUE_LONG);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
@ -323,6 +327,7 @@ public class KinesisClientLibConfigurationTest {
1, 1,
skipCheckpointValidationValue, skipCheckpointValidationValue,
"abcd", "abcd",
TEST_VALUE_LONG,
TEST_VALUE_LONG); TEST_VALUE_LONG);
Assert.fail("No expected Exception is thrown."); Assert.fail("No expected Exception is thrown.");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {

View file

@ -14,18 +14,36 @@
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
@ -43,8 +61,14 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
/** /**
* Unit tests for KinesisDataFetcher. * Unit tests for KinesisDataFetcher.
*/ */
@RunWith(MockitoJUnitRunner.class)
public class KinesisDataFetcherTest { public class KinesisDataFetcherTest {
@Mock
private KinesisProxy kinesisProxy;
@Mock
private KinesisClientLibConfiguration configuration;
private static final int MAX_RECORDS = 1; private static final int MAX_RECORDS = 1;
private static final String SHARD_ID = "shardId-1"; private static final String SHARD_ID = "shardId-1";
private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); private static final String AT_SEQUENCE_NUMBER = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
@ -115,8 +139,9 @@ public class KinesisDataFetcherTest {
public void testadvanceIteratorTo() throws KinesisClientLibException { public void testadvanceIteratorTo() throws KinesisClientLibException {
IKinesisProxy kinesis = mock(IKinesisProxy.class); IKinesisProxy kinesis = mock(IKinesisProxy.class);
ICheckpoint checkpoint = mock(ICheckpoint.class); ICheckpoint checkpoint = mock(ICheckpoint.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
String iteratorA = "foo"; String iteratorA = "foo";
@ -148,8 +173,9 @@ public class KinesisDataFetcherTest {
@Test @Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
IKinesisProxy kinesis = mock(IKinesisProxy.class); IKinesisProxy kinesis = mock(IKinesisProxy.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
String iteratorHorizon = "horizon"; String iteratorHorizon = "horizon";
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon); when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon);
@ -178,9 +204,10 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class); KinesisProxy mockProxy = mock(KinesisProxy.class);
doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString()); doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString());
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
// Create data fectcher and initialize it with latest type checkpoint // Create data fectcher and initialize it with latest type checkpoint
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
// Call getRecords of dataFetcher which will throw an exception // Call getRecords of dataFetcher which will throw an exception
@ -197,14 +224,95 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class); KinesisProxy mockProxy = mock(KinesisProxy.class);
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsResult getRecordsResult = dataFetcher.getRecords(maxRecords); DataFetcherResult dataFetcherResult = dataFetcher.getRecords(maxRecords);
Assert.assertNotNull(getRecordsResult); assertThat(dataFetcherResult, notNullValue());
Assert.assertTrue(getRecordsResult.getRecords().isEmpty()); }
@Test
public void testFetcherDoesNotAdvanceWithoutAccept() {
final String INITIAL_ITERATOR = "InitialIterator";
final String NEXT_ITERATOR_ONE = "NextIteratorOne";
final String NEXT_ITERATOR_TWO = "NextIteratorTwo";
when(kinesisProxy.getIterator(anyString(), anyString())).thenReturn(INITIAL_ITERATOR);
GetRecordsResult iteratorOneResults = mock(GetRecordsResult.class);
when(iteratorOneResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_ONE);
when(kinesisProxy.get(eq(INITIAL_ITERATOR), anyInt())).thenReturn(iteratorOneResults);
GetRecordsResult iteratorTwoResults = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_ONE), anyInt())).thenReturn(iteratorTwoResults);
when(iteratorTwoResults.getNextShardIterator()).thenReturn(NEXT_ITERATOR_TWO);
GetRecordsResult finalResult = mock(GetRecordsResult.class);
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
when(finalResult.getNextShardIterator()).thenReturn(null);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO, configuration);
dataFetcher.initialize("TRIM_HORIZON",
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
assertNoAdvance(dataFetcher, iteratorOneResults, INITIAL_ITERATOR);
assertAdvanced(dataFetcher, iteratorOneResults, INITIAL_ITERATOR, NEXT_ITERATOR_ONE);
assertNoAdvance(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE);
assertAdvanced(dataFetcher, iteratorTwoResults, NEXT_ITERATOR_ONE, NEXT_ITERATOR_TWO);
assertNoAdvance(dataFetcher, finalResult, NEXT_ITERATOR_TWO);
assertAdvanced(dataFetcher, finalResult, NEXT_ITERATOR_TWO, null);
verify(kinesisProxy, times(2)).get(eq(INITIAL_ITERATOR), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_ONE), anyInt());
verify(kinesisProxy, times(2)).get(eq(NEXT_ITERATOR_TWO), anyInt());
reset(kinesisProxy);
DataFetcherResult terminal = dataFetcher.getRecords(100);
assertThat(terminal.isShardEnd(), equalTo(true));
assertThat(terminal.getResult(), notNullValue());
GetRecordsResult terminalResult = terminal.getResult();
assertThat(terminalResult.getRecords(), notNullValue());
assertThat(terminalResult.getRecords(), empty());
assertThat(terminalResult.getNextShardIterator(), nullValue());
assertThat(terminal, equalTo(dataFetcher.TERMINAL_RESULT));
verify(kinesisProxy, never()).get(anyString(), anyInt());
}
private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult,
String previousValue, String nextValue) {
DataFetcherResult acceptResult = dataFetcher.getRecords(100);
assertThat(acceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
assertThat(dataFetcher.isShardEndReached(), equalTo(false));
assertThat(acceptResult.accept(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(nextValue));
if (nextValue == null) {
assertThat(dataFetcher.isShardEndReached(), equalTo(true));
}
verify(kinesisProxy, times(2)).get(eq(previousValue), anyInt());
return acceptResult;
}
private DataFetcherResult assertNoAdvance(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult,
String previousValue) {
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
DataFetcherResult noAcceptResult = dataFetcher.getRecords(100);
assertThat(noAcceptResult.getResult(), equalTo(expectedResult));
assertThat(dataFetcher.getNextIterator(), equalTo(previousValue));
verify(kinesisProxy).get(eq(previousValue), anyInt());
return noAcceptResult;
} }
private void testInitializeAndFetch(String iteratorType, private void testInitializeAndFetch(String iteratorType,
@ -223,8 +331,9 @@ public class KinesisDataFetcherTest {
ICheckpoint checkpoint = mock(ICheckpoint.class); ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
fetcher.initialize(seqNo, initialPositionInStream); fetcher.initialize(seqNo, initialPositionInStream);
List<Record> actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords(); List<Record> actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords();

View file

@ -0,0 +1,195 @@
/*
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
/**
*
*/
@RunWith(MockitoJUnitRunner.class)
public class PrefetchGetRecordsCacheIntegrationTest {
private static final int MAX_SIZE = 3;
private static final int MAX_BYTE_SIZE = 5 * 1024 * 1024;
private static final int MAX_RECORDS_COUNT = 30_000;
private static final int MAX_RECORDS_PER_CALL = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
private PrefetchGetRecordsCache getRecordsCache;
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private KinesisDataFetcher dataFetcher;
private ExecutorService executorService;
private List<Record> records;
@Mock
private IKinesisProxy proxy;
@Mock
private ShardInfo shardInfo;
@Mock
private KinesisClientLibConfiguration configuration;
@Before
public void setup() {
when(configuration.getIdleMillisBetweenCalls()).thenReturn(IDLE_MILLIS_BETWEEN_CALLS);
records = new ArrayList<>();
dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo, configuration);
getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
executorService = spy(Executors.newFixedThreadPool(1));
getRecordsCache = new PrefetchGetRecordsCache(MAX_SIZE,
MAX_BYTE_SIZE,
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
}
@Test
public void testRollingCache() {
getRecordsCache.start();
sleep(IDLE_MILLIS_BETWEEN_CALLS);
ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult();
assertTrue(processRecordsInput1.getRecords().isEmpty());
assertEquals(processRecordsInput1.getMillisBehindLatest(), new Long(1000));
assertNotNull(processRecordsInput1.getCacheEntryTime());
ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult();
assertNotEquals(processRecordsInput1, processRecordsInput2);
}
@Test
public void testFullCache() {
getRecordsCache.start();
sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS);
assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE);
ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult();
ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult();
assertNotEquals(processRecordsInput1, processRecordsInput2);
}
@Test
public void testDifferentShardCaches() {
ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1));
KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo, configuration));
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, "Test-shard"));
GetRecordsCache getRecordsCache2 = new PrefetchGetRecordsCache(
MAX_SIZE,
MAX_BYTE_SIZE,
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy2,
executorService2
);
getRecordsCache.start();
sleep(IDLE_MILLIS_BETWEEN_CALLS);
Record record = mock(Record.class);
ByteBuffer byteBuffer = ByteBuffer.allocate(512 * 1024);
when(record.getData()).thenReturn(byteBuffer);
records.add(record);
records.add(record);
records.add(record);
records.add(record);
getRecordsCache2.start();
sleep(IDLE_MILLIS_BETWEEN_CALLS);
ProcessRecordsInput p1 = getRecordsCache.getNextResult();
ProcessRecordsInput p2 = getRecordsCache2.getNextResult();
assertNotEquals(p1, p2);
assertTrue(p1.getRecords().isEmpty());
assertFalse(p2.getRecords().isEmpty());
assertEquals(p2.getRecords().size(), records.size());
getRecordsCache2.shutdown();
verify(executorService2).shutdownNow();
verify(getRecordsRetrievalStrategy2).shutdown();
}
@After
public void shutdown() {
getRecordsCache.shutdown();
verify(executorService).shutdownNow();
verify(getRecordsRetrievalStrategy).shutdown();
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {}
}
private class KinesisDataFetcherForTest extends KinesisDataFetcher {
public KinesisDataFetcherForTest(final IKinesisProxy kinesisProxy,
final ShardInfo shardInfo,
final KinesisClientLibConfiguration configuration) {
super(kinesisProxy, shardInfo, configuration);
}
@Override
public DataFetcherResult getRecords(final int maxRecords) {
GetRecordsResult getRecordsResult = new GetRecordsResult();
getRecordsResult.setRecords(new ArrayList<>(records));
getRecordsResult.setMillisBehindLatest(1000L);
return new AdvancingResult(getRecordsResult);
}
}
}

View file

@ -183,6 +183,12 @@ public class PrefetchGetRecordsCacheTest {
getRecordsCache.getNextResult(); getRecordsCache.getNextResult();
} }
@Test(expected = IllegalStateException.class)
public void testCallAfterShutdown() {
when(executorService.isShutdown()).thenReturn(true);
getRecordsCache.getNextResult();
}
@After @After
public void shutdown() { public void shutdown() {
getRecordsCache.shutdown(); getRecordsCache.shutdown();

View file

@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -47,7 +48,6 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord; import com.amazonaws.services.kinesis.clientlibrary.types.Messages.AggregatedRecord;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -75,11 +75,7 @@ public class ProcessTaskTest {
@Mock @Mock
private ThrottlingReporter throttlingReporter; private ThrottlingReporter throttlingReporter;
@Mock @Mock
private GetRecordsRetrievalStrategy mockGetRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
@Mock
private RecordsFetcherFactory mockRecordsFetcherFactory;
@Mock
private GetRecordsCache mockRecordsFetcher;
private List<Record> processedRecords; private List<Record> processedRecords;
private ExtendedSequenceNumber newLargestPermittedCheckpointValue; private ExtendedSequenceNumber newLargestPermittedCheckpointValue;
@ -96,34 +92,40 @@ public class ProcessTaskTest {
skipCheckpointValidationValue, skipCheckpointValidationValue,
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
when(mockRecordsFetcherFactory.createRecordsFetcher(mockGetRecordsRetrievalStrategy)).thenReturn(mockRecordsFetcher);
processTask = new ProcessTask( processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockRecordsFetcherFactory, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, shardInfo,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, throttlingReporter, mockGetRecordsRetrievalStrategy); config,
mockRecordProcessor,
mockCheckpointer,
mockDataFetcher,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
throttlingReporter,
getRecordsCache);
} }
@Test @Test
public void testProcessTaskWithProvisionedThroughputExceededException() { public void testProcessTaskWithProvisionedThroughputExceededException() {
// Set data fetcher to throw exception // Set data fetcher to throw exception
doReturn(false).when(mockDataFetcher).isShardEndReached(); doReturn(false).when(mockDataFetcher).isShardEndReached();
doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(mockRecordsFetcher) doThrow(new ProvisionedThroughputExceededException("Test Exception")).when(getRecordsCache)
.getNextResult(); .getNextResult();
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(throttlingReporter).throttled(); verify(throttlingReporter).throttled();
verify(throttlingReporter, never()).success(); verify(throttlingReporter, never()).success();
verify(mockRecordsFetcher).getNextResult(); verify(getRecordsCache).getNextResult();
assertTrue("Result should contain ProvisionedThroughputExceededException", assertTrue("Result should contain ProvisionedThroughputExceededException",
result.getException() instanceof ProvisionedThroughputExceededException); result.getException() instanceof ProvisionedThroughputExceededException);
} }
@Test @Test
public void testProcessTaskWithNonExistentStream() { public void testProcessTaskWithNonExistentStream() {
// Data fetcher returns a null Result when the stream does not exist // Data fetcher returns a null Result ` the stream does not exist
doReturn(new GetRecordsResult().withRecords(Collections.emptyList())).when(mockRecordsFetcher).getNextResult(); doReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest((long) 0)).when(getRecordsCache).getNextResult();
TaskResult result = processTask.call(); TaskResult result = processTask.call();
verify(mockRecordsFetcher).getNextResult(); verify(getRecordsCache).getNextResult();
assertNull("Task should not throw an exception", result.getException()); assertNull("Task should not throw an exception", result.getException());
} }
@ -307,14 +309,13 @@ public class ProcessTaskTest {
private void testWithRecords(List<Record> records, private void testWithRecords(List<Record> records,
ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber lastCheckpointValue,
ExtendedSequenceNumber largestPermittedCheckpointValue) { ExtendedSequenceNumber largestPermittedCheckpointValue) {
when(mockRecordsFetcher.getNextResult()).thenReturn( when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(records).withMillisBehindLatest((long) 1000 * 50));
new GetRecordsResult().withRecords(records));
when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue); when(mockCheckpointer.getLastCheckpointValue()).thenReturn(lastCheckpointValue);
when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue); when(mockCheckpointer.getLargestPermittedCheckpointValue()).thenReturn(largestPermittedCheckpointValue);
processTask.call(); processTask.call();
verify(throttlingReporter).success(); verify(throttlingReporter).success();
verify(throttlingReporter, never()).throttled(); verify(throttlingReporter, never()).throttled();
verify(mockRecordsFetcher).getNextResult(); verify(getRecordsCache).getNextResult();
ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class); ArgumentCaptor<ProcessRecordsInput> priCaptor = ArgumentCaptor.forClass(ProcessRecordsInput.class);
verify(mockRecordProcessor).processRecords(priCaptor.capture()); verify(mockRecordProcessor).processRecords(priCaptor.capture());
processedRecords = priCaptor.getValue().getRecords(); processedRecords = priCaptor.getValue().getRecords();

View file

@ -20,9 +20,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
@ -48,12 +48,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -97,14 +98,16 @@ public class ShardConsumerTest {
// Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is
// ... a non-final public class, and so can be mocked and spied. // ... a non-final public class, and so can be mocked and spied.
private final ExecutorService executorService = Executors.newFixedThreadPool(1); private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final int maxRecords = 500;
private RecordsFetcherFactory recordsFetcherFactory;
private GetRecordsCache getRecordsCache;
@Mock @Mock
private IRecordProcessor processor; private IRecordProcessor processor;
@Mock @Mock
private KinesisClientLibConfiguration config; private KinesisClientLibConfiguration config;
@Mock @Mock
private RecordsFetcherFactory recordsFetcherFactory;
@Mock
private IKinesisProxy streamProxy; private IKinesisProxy streamProxy;
@Mock @Mock
private ILeaseManager<KinesisClientLease> leaseManager; private ILeaseManager<KinesisClientLease> leaseManager;
@ -112,6 +115,16 @@ public class ShardConsumerTest {
private ICheckpoint checkpoint; private ICheckpoint checkpoint;
@Mock @Mock
private ShutdownNotification shutdownNotification; private ShutdownNotification shutdownNotification;
@Before
public void setup() {
getRecordsCache = null;
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(0l);
}
/** /**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/ */
@ -136,14 +149,14 @@ public class ShardConsumerTest {
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config,
null, null,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -160,7 +173,6 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
} }
/** /**
* Test method to verify consumer stays in INITIALIZING state when InitializationTask fails. * Test method to verify consumer stays in INITIALIZING state when InitializationTask fails.
*/ */
@ -185,14 +197,14 @@ public class ShardConsumerTest {
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config,
null, null,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
spyExecutorService, spyExecutorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -214,7 +226,6 @@ public class ShardConsumerTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public final void testRecordProcessorThrowable() throws Exception { public final void testRecordProcessorThrowable() throws Exception {
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig = StreamConfig streamConfig =
new StreamConfig(streamProxy, new StreamConfig(streamProxy,
@ -228,14 +239,14 @@ public class ShardConsumerTest {
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config,
null, null,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null; final ExtendedSequenceNumber pendingCheckpointSequenceNumber = null;
@ -308,7 +319,6 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(maxRecords));
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig = StreamConfig streamConfig =
@ -319,19 +329,39 @@ public class ShardConsumerTest {
skipCheckpointValidationValue, INITIAL_POSITION_LATEST); skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =
new ShardConsumer(shardInfo, new ShardConsumer(shardInfo,
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config, recordProcessorCheckpointer,
leaseManager, leaseManager,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -340,6 +370,7 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
processor.getInitializeLatch().await(5, TimeUnit.SECONDS); processor.getInitializeLatch().await(5, TimeUnit.SECONDS);
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls // We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) { for (int i = 0; i < numRecs;) {
@ -353,6 +384,8 @@ public class ShardConsumerTest {
Thread.sleep(50L); Thread.sleep(50L);
} }
verify(getRecordsCache, times(5)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.notifyShutdownRequested(shutdownNotification); consumer.notifyShutdownRequested(shutdownNotification);
consumer.consumeShard(); consumer.consumeShard();
@ -376,6 +409,8 @@ public class ShardConsumerTest {
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE)));
verify(getRecordsCache).shutdown();
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS); executorService.awaitTermination(60, TimeUnit.SECONDS);
@ -411,7 +446,6 @@ public class ShardConsumerTest {
ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString());
checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken); checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.AT_TIMESTAMP, testConcurrencyToken);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2));
TestStreamlet processor = new TestStreamlet(); TestStreamlet processor = new TestStreamlet();
StreamConfig streamConfig = StreamConfig streamConfig =
@ -423,19 +457,39 @@ public class ShardConsumerTest {
atTimestamp); atTimestamp);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpoint,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer = ShardConsumer consumer =
new ShardConsumer(shardInfo, new ShardConsumer(shardInfo,
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config, recordProcessorCheckpointer,
leaseManager, leaseManager,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -445,6 +499,8 @@ public class ShardConsumerTest {
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
Thread.sleep(50L); Thread.sleep(50L);
verify(getRecordsCache).start();
// We expect to process all records in numRecs calls // We expect to process all records in numRecs calls
for (int i = 0; i < numRecs;) { for (int i = 0; i < numRecs;) {
boolean newTaskSubmitted = consumer.consumeShard(); boolean newTaskSubmitted = consumer.consumeShard();
@ -457,6 +513,8 @@ public class ShardConsumerTest {
Thread.sleep(50L); Thread.sleep(50L);
} }
verify(getRecordsCache, times(4)).getNextResult();
assertThat(processor.getShutdownReason(), nullValue()); assertThat(processor.getShutdownReason(), nullValue());
consumer.beginShutdown(); consumer.beginShutdown();
Thread.sleep(50L); Thread.sleep(50L);
@ -468,8 +526,11 @@ public class ShardConsumerTest {
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS); executorService.awaitTermination(60, TimeUnit.SECONDS);
verify(getRecordsCache).shutdown();
String iterator = fileBasedProxy.getIterator(streamShardId, timestamp); String iterator = fileBasedProxy.getIterator(streamShardId, timestamp);
List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); List<Record> expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords());
verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); verifyConsumedRecords(expectedRecords, processor.getProcessedRecords());
assertEquals(4, processor.getProcessedRecords().size()); assertEquals(4, processor.getProcessedRecords().size());
file.delete(); file.delete();
@ -491,14 +552,16 @@ public class ShardConsumerTest {
streamConfig, streamConfig,
checkpoint, checkpoint,
processor, processor,
config,
null, null,
parentShardPollIntervalMillis, parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
config);
GetRecordsCache getRecordsCache = spy(consumer.getGetRecordsCache());
final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123");
final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999");
@ -548,9 +611,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(), Optional.empty(),
Optional.empty()); Optional.empty(),
config);
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
SynchronousGetRecordsRetrievalStrategy.class);
} }
@Test @Test
@ -576,9 +641,11 @@ public class ShardConsumerTest {
taskBackoffTimeMillis, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1), Optional.of(1),
Optional.of(2)); Optional.of(2),
config);
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class); assertEquals(shardConsumer.getGetRecordsCache().getGetRecordsRetrievalStrategy().getClass(),
AsynchronousGetRecordsRetrievalStrategy.class);
} }
//@formatter:off (gets the formatting wrong) //@formatter:off (gets the formatting wrong)

View file

@ -59,7 +59,7 @@ public class ShutdownTaskTest {
IRecordProcessor defaultRecordProcessor = new TestStreamlet(); IRecordProcessor defaultRecordProcessor = new TestStreamlet();
@Mock @Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private GetRecordsCache getRecordsCache;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -80,7 +80,7 @@ public class ShutdownTaskTest {
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(getRecordsRetrievalStrategy).shutdown(); doNothing().when(getRecordsCache).shutdown();
} }
/** /**
@ -109,7 +109,7 @@ public class ShutdownTaskTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy); getRecordsCache);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -135,11 +135,11 @@ public class ShutdownTaskTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS, TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy); getRecordsCache);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsRetrievalStrategy).shutdown(); verify(getRecordsCache).shutdown();
} }
/** /**
@ -147,7 +147,7 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testGetTaskType() { public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy); ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsCache);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }

View file

@ -60,6 +60,7 @@ import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher; import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Matchers; import org.mockito.Matchers;
@ -130,6 +131,8 @@ public class WorkerTest {
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
private RecordsFetcherFactory recordsFetcherFactory;
@Mock @Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator; private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock @Mock
@ -157,6 +160,13 @@ public class WorkerTest {
@Mock @Mock
private TaskResult taskResult; private TaskResult taskResult;
@Before
public void setup() {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(500L);
}
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
@ -283,10 +293,22 @@ public class WorkerTest {
when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint) when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialState).thenReturn(firstCheckpoint)
.thenReturn(secondCheckpoint); .thenReturn(secondCheckpoint);
Worker worker = new Worker(stageName, streamletFactory, config, streamConfig, INITIAL_POSITION_LATEST, Worker worker = new Worker(stageName,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, streamletFactory,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, config,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); streamConfig,
INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
checkpoint,
leaseCoordinator,
execService,
nullMetricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization);
Worker workerSpy = spy(worker); Worker workerSpy = spy(worker);
@ -759,10 +781,22 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, config, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, config,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -909,10 +943,22 @@ public class WorkerTest {
when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture); when(coordinator.startGracefulShutdown(any(Callable.class))).thenReturn(gracefulShutdownFuture);
Worker worker = new InjectableWorker("testRequestShutdown", recordProcessorFactory, config, streamConfig, Worker worker = new InjectableWorker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, config,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization) { streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization) {
@Override @Override
void postConstruct() { void postConstruct() {
this.gracefulShutdownCoordinator = coordinator; this.gracefulShutdownCoordinator = coordinator;
@ -973,10 +1019,22 @@ public class WorkerTest {
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1045,10 +1103,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1148,10 +1218,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1255,10 +1337,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1329,10 +1423,22 @@ public class WorkerTest {
IRecordProcessor processor = mock(IRecordProcessor.class); IRecordProcessor processor = mock(IRecordProcessor.class);
when(recordProcessorFactory.createProcessor()).thenReturn(processor); when(recordProcessorFactory.createProcessor()).thenReturn(processor);
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, clientConfig, streamConfig, Worker worker = new Worker("testRequestShutdown",
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, recordProcessorFactory,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, clientConfig,
taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion,
leaseCoordinator,
leaseCoordinator,
executorService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
false,
shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1374,10 +1480,22 @@ public class WorkerTest {
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) {
super(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, super(applicationName,
parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, recordProcessorFactory,
checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, config,
failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization); streamConfig,
initialPositionInStream,
parentShardPollIntervalMillis,
shardSyncIdleTimeMillis,
cleanupLeasesUponShardCompletion,
checkpoint,
leaseCoordinator,
execService,
metricsFactory,
taskBackoffTimeMillis,
failoverTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization);
postConstruct(); postConstruct();
} }
@ -1681,8 +1799,10 @@ public class WorkerTest {
idleTimeInMilliseconds, idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList, callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig = KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null));
new KinesisClientLibConfiguration("app", null, null, null);
when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L);
Worker worker = Worker worker =
new Worker(stageName, new Worker(stageName,
recordProcessorFactory, recordProcessorFactory,