merged chagnes

This commit is contained in:
Wei 2017-09-22 14:13:22 -07:00
commit f841ea2cd9
11 changed files with 182 additions and 127 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.2 Bundle-Version: 1.8.3
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes ## Release Notes
### Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.
The asynchronous retriever is only used when [`KinesisClientLibConfiguration#retryGetRecordsInSeconds`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L227), and [`KinesisClientLibConfiguration#maxGetRecordsThreadPool`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L230) are set.
* [PR #222](https://github.com/awslabs/amazon-kinesis-client/pull/222)
### Release 1.8.2 (September 20, 2017) ### Release 1.8.2 (September 20, 2017)
* Add support for two phase checkpoints * Add support for two phase checkpoints
Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint. Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint.
@ -36,10 +42,10 @@ To make it easier for developers to write record processors in other languages,
* [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188) * [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188)
* Support timeouts, and retry for GetRecords calls. * Support timeouts, and retry for GetRecords calls.
Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests. Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests.
* [PR #214](https://github.com/awslabs/amazon-kinesis-client/pulls/214) * [PR #214](https://github.com/awslabs/amazon-kinesis-client/pull/214)
* Notification when the lease table is throttled * Notification when the lease table is throttled
When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays. When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays.
* [PR #212](https://github.com/awslabs/amazon-kinesis-client/pulls/212) * [PR #212](https://github.com/awslabs/amazon-kinesis-client/pull/212)
* Support configuring the graceful shutdown timeout for MultiLang Clients * Support configuring the graceful shutdown timeout for MultiLang Clients
This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait. This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait.
* [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204) * [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204)

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.8.2</version> <version>1.8.3</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>

View file

@ -308,10 +308,9 @@ class ConsumerStates {
@Override @Override
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getDataFetcher(), consumer.getTaskBackoffTimeMillis(), consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(), consumer.getGetRecordsRetrievalStrategy());
consumer.getGetRecordsCache());
} }
@Override @Override
@ -515,7 +514,8 @@ class ConsumerStates {
consumer.getStreamConfig().getStreamProxy(), consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(), consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(), consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis()); consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsRetrievalStrategy());
} }
@Override @Override

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.2"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Optional; import java.util.Optional;
@ -33,6 +34,7 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
@ -53,7 +55,6 @@ class ProcessTask implements ITask {
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
private final GetRecordsCache getRecordsCache;
private final RecordProcessorCheckpointer recordProcessorCheckpointer; private final RecordProcessorCheckpointer recordProcessorCheckpointer;
private final KinesisDataFetcher dataFetcher; private final KinesisDataFetcher dataFetcher;
private final TaskType taskType = TaskType.PROCESS; private final TaskType taskType = TaskType.PROCESS;
@ -62,16 +63,7 @@ class ProcessTask implements ITask {
private final Shard shard; private final Shard shard;
private final ThrottlingReporter throttlingReporter; private final ThrottlingReporter throttlingReporter;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/** /**
* @param shardInfo * @param shardInfo
@ -86,17 +78,17 @@ class ProcessTask implements ITask {
* Kinesis data fetcher (used to fetch records from Kinesis) * Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis * @param backoffTimeMillis
* backoff time when catching exceptions * backoff time when catching exceptions
* @param getRecordsCache * @param getRecordsRetrievalStrategy
* Record processor factory to create recordFetcher object * The retrieval strategy for fetching records from kinesis
*/ */
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
KinesisDataFetcher dataFetcher, long backoffTimeMillis, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, skipShardSyncAtWorkerInitializationIfLeasesExist,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()), getRecordsCache); getRecordsRetrievalStrategy);
} }
/** /**
@ -106,8 +98,6 @@ class ProcessTask implements ITask {
* Stream configuration * Stream configuration
* @param recordProcessor * @param recordProcessor
* Record processor used to process the data records for the shard * Record processor used to process the data records for the shard
* @param getRecordsCache
* RecordFetcher factory used to create recordFetcher object
* @param recordProcessorCheckpointer * @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress * Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher * @param dataFetcher
@ -118,9 +108,9 @@ class ProcessTask implements ITask {
* determines how throttling events should be reported in the log. * determines how throttling events should be reported in the log.
*/ */
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { ThrottlingReporter throttlingReporter, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -130,7 +120,7 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.throttlingReporter = throttlingReporter; this.throttlingReporter = throttlingReporter;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
this.getRecordsCache = getRecordsCache; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
// If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
@ -168,10 +158,10 @@ class ProcessTask implements ITask {
return new TaskResult(null, true); return new TaskResult(null, true);
} }
final ProcessRecordsInput processRecordsInput = getRecordsResult(); final GetRecordsResult getRecordsResult = getRecordsResult();
throttlingReporter.success(); throttlingReporter.success();
List<Record> records = processRecordsInput.getRecords(); List<Record> records = getRecordsResult.getRecords();
if (!records.isEmpty()) { if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY); scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
} else { } else {
@ -185,7 +175,7 @@ class ProcessTask implements ITask {
recordProcessorCheckpointer.getLargestPermittedCheckpointValue())); recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
if (shouldCallProcessRecords(records)) { if (shouldCallProcessRecords(records)) {
callProcessRecords(processRecordsInput); callProcessRecords(getRecordsResult, records);
} }
} catch (ProvisionedThroughputExceededException pte) { } catch (ProvisionedThroughputExceededException pte) {
throttlingReporter.throttled(); throttlingReporter.throttled();
@ -215,15 +205,18 @@ class ProcessTask implements ITask {
/** /**
* Dispatches a batch of records to the record processor, and handles any fallout from that. * Dispatches a batch of records to the record processor, and handles any fallout from that.
* *
* @param processRecordsInput * @param getRecordsResult
* the ProcessRecordsInput result of the last call to Kinesis * the result of the last call to Kinesis
* @param records
* the records to be dispatched. It's possible the records have been adjusted by KPL deaggregation.
*/ */
private void callProcessRecords(ProcessRecordsInput processRecordsInput) { private void callProcessRecords(GetRecordsResult getRecordsResult, List<Record> records) {
List<Record> records = processRecordsInput.getRecords();
LOG.debug("Calling application processRecords() with " + records.size() + " records from " LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId()); + shardInfo.getShardId());
processRecordsInput.withCheckpointer(recordProcessorCheckpointer); final ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records)
.withCheckpointer(recordProcessorCheckpointer)
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
final long recordProcessorStartTimeMillis = System.currentTimeMillis(); final long recordProcessorStartTimeMillis = System.currentTimeMillis();
try { try {
@ -240,7 +233,7 @@ class ProcessTask implements ITask {
/** /**
* Whether we should call process records or not * Whether we should call process records or not
* *
* @param records * @param records
* the records returned from the call to Kinesis, and/or deaggregation * the records returned from the call to Kinesis, and/or deaggregation
* @return true if the set of records should be dispatched to the record process, false if they should not. * @return true if the set of records should be dispatched to the record process, false if they should not.
@ -251,7 +244,7 @@ class ProcessTask implements ITask {
/** /**
* Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation * Determines whether to deaggregate the given records, and if they are KPL records dispatches them to deaggregation
* *
* @param records * @param records
* the records to deaggregate is deaggregation is required. * the records to deaggregate is deaggregation is required.
* @return returns either the deaggregated records, or the original records * @return returns either the deaggregated records, or the original records
@ -274,7 +267,7 @@ class ProcessTask implements ITask {
/** /**
* Emits metrics, and sleeps if there are no records available * Emits metrics, and sleeps if there are no records available
* *
* @param startTimeMillis * @param startTimeMillis
* the time when the task started * the time when the task started
*/ */
@ -311,8 +304,8 @@ class ProcessTask implements ITask {
* @return the largest extended sequence number among the retained records * @return the largest extended sequence number among the retained records
*/ */
private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records, private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records,
final ExtendedSequenceNumber lastCheckpointValue, final ExtendedSequenceNumber lastCheckpointValue,
final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) { final ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue; ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue;
ListIterator<Record> recordIterator = records.listIterator(); ListIterator<Record> recordIterator = records.listIterator();
while (recordIterator.hasNext()) { while (recordIterator.hasNext()) {
@ -346,7 +339,7 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private ProcessRecordsInput getRecordsResult() { private GetRecordsResult getRecordsResult() {
try { try {
return getRecordsResultAndRecordMillisBehindLatest(); return getRecordsResultAndRecordMillisBehindLatest();
} catch (ExpiredIteratorException e) { } catch (ExpiredIteratorException e) {
@ -382,17 +375,22 @@ class ProcessTask implements ITask {
* *
* @return list of data records from Kinesis * @return list of data records from Kinesis
*/ */
private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() { private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
final ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); final GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(streamConfig.getMaxRecords());
if (processRecordsInput.getMillisBehindLatest() != null) { if (getRecordsResult == null) {
// Stream no longer exists
return new GetRecordsResult().withRecords(Collections.<Record>emptyList());
}
if (getRecordsResult.getMillisBehindLatest() != null) {
MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC,
processRecordsInput.getMillisBehindLatest(), getRecordsResult.getMillisBehindLatest(),
StandardUnit.Milliseconds, StandardUnit.Milliseconds,
MetricsLevel.SUMMARY); MetricsLevel.SUMMARY);
} }
return processRecordsInput; return getRecordsResult;
} }
} }

View file

@ -56,16 +56,13 @@ class ShardConsumer {
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@Getter
private final Optional<Integer> retryGetRecordsInSeconds;
@Getter
private final Optional<Integer> maxGetRecordsThreadPool;
private ITask currentTask; private ITask currentTask;
private long currentTaskSubmitTime; private long currentTaskSubmitTime;
private Future<TaskResult> future; private Future<TaskResult> future;
@Getter @Getter
private final GetRecordsCache getRecordsCache; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> retryGetRecordsInSeconds,
@ -168,11 +165,7 @@ class ShardConsumer {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis; this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo);
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, this.retryGetRecordsInSeconds,
this.maxGetRecordsThreadPool, this.shardInfo));
} }
/** /**

View file

@ -1,16 +1,16 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
* A copy of the License is located at * A copy of the License is located at
* *
* http://aws.amazon.com/asl/ * http://aws.amazon.com/asl/
* *
* or in the "license" file accompanying this file. This file is distributed * or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -46,20 +46,22 @@ class ShutdownTask implements ITask {
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN; private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis; private final long backoffTimeMillis;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
/** /**
* Constructor. * Constructor.
*/ */
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShutdownTask(ShardInfo shardInfo, ShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason, ShutdownReason reason,
IKinesisProxy kinesisProxy, IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager, ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis) { long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.leaseManager = leaseManager; this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
} }
/* /*
@ -79,7 +82,7 @@ class ShutdownTask implements ITask {
*/ */
@Override @Override
public TaskResult call() { public TaskResult call() {
Exception exception = null; Exception exception;
boolean applicationException = false; boolean applicationException = false;
try { try {
@ -107,6 +110,8 @@ class ShutdownTask implements ITask {
+ shardInfo.getShardId()); + shardInfo.getShardId());
} }
} }
LOG.debug("Shutting down retrieval strategy.");
getRecordsRetrievalStrategy.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId()); LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
} catch (Exception e) { } catch (Exception e) {
applicationException = true; applicationException = true;

View file

@ -17,17 +17,14 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -80,9 +77,7 @@ public class ConsumerStatesTest {
@Mock @Mock
private InitialPositionInStreamExtended initialPositionInStream; private InitialPositionInStreamExtended initialPositionInStream;
@Mock @Mock
private RecordsFetcherFactory recordsFetcherFactory; private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@Mock
private GetRecordsCache recordsFetcher;
private long parentShardPollIntervalMillis = 0xCAFE; private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true; private boolean cleanupLeasesOfCompletedShards = true;
@ -105,8 +100,7 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards); when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis); when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason); when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getConfig()).thenReturn(config); when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
} }
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class; private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@ -163,9 +157,6 @@ public class ConsumerStatesTest {
@Test @Test
public void processingStateTestSynchronous() { public void processingStateTestSynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty());
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty());
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer); ITask task = state.createTask(consumer);
@ -176,7 +167,6 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
@ -194,9 +184,6 @@ public class ConsumerStatesTest {
@Test @Test
public void processingStateTestAsynchronous() { public void processingStateTestAsynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer); ITask task = state.createTask(consumer);
@ -207,7 +194,6 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
@ -225,9 +211,6 @@ public class ConsumerStatesTest {
@Test @Test
public void processingStateRecordsFetcher() { public void processingStateRecordsFetcher() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
when(recordsFetcherFactory.createRecordsFetcher((any()))).thenReturn(recordsFetcher);
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState(); ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer); ITask task = state.createTask(consumer);
@ -239,7 +222,6 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher))); assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig))); assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis))); assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsCache.class, "recordsFetcher", equalTo(recordsFetcher)));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState())); assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));

View file

@ -1,16 +1,16 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
* A copy of the License is located at * A copy of the License is located at
* *
* http://aws.amazon.com/asl/ * http://aws.amazon.com/asl/
* *
* or in the "license" file accompanying this file. This file is distributed * or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
@ -40,6 +41,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -522,6 +524,62 @@ public class ShardConsumerTest {
Thread.sleep(50L); Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
} }
@Test
public void testCreateSynchronousGetRecordsRetrieval() {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(),
Optional.empty());
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class);
}
@Test
public void testCreateAsynchronousGetRecordsRetrieval() {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1),
Optional.of(2));
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class);
}
//@formatter:off (gets the formatting wrong) //@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords, private void verifyConsumedRecords(List<Record> expectedRecords,

View file

@ -1,20 +1,22 @@
/* /*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
* A copy of the License is located at * A copy of the License is located at
* *
* http://aws.amazon.com/asl/ * http://aws.amazon.com/asl/
* *
* or in the "license" file accompanying this file. This file is distributed * or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing * express or implied. See the License for the specific language governing
* permissions and limitations under the License. * permissions and limitations under the License.
*/ */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker; package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.HashSet; import java.util.HashSet;
@ -34,10 +36,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/** /**
* *
*/ */
@RunWith(MockitoJUnitRunner.class)
public class ShutdownTaskTest { public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
@ -51,6 +57,9 @@ public class ShutdownTaskTest {
defaultParentShardIds, defaultParentShardIds,
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
IRecordProcessor defaultRecordProcessor = new TestStreamlet(); IRecordProcessor defaultRecordProcessor = new TestStreamlet();
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -71,6 +80,7 @@ public class ShutdownTaskTest {
*/ */
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(getRecordsRetrievalStrategy).shutdown();
} }
/** /**
@ -98,7 +108,8 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS); TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException); Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -123,10 +134,12 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
leaseManager, leaseManager,
TASK_BACKOFF_TIME_MILLIS); TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
TaskResult result = task.call(); TaskResult result = task.call();
Assert.assertNotNull(result.getException()); Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException); Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsRetrievalStrategy).shutdown();
} }
/** /**
@ -134,7 +147,7 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testGetTaskType() { public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0); ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
} }