Introduction of Listshards API to replace DescribeStream (#293)

Use ListShards to retrieve shard information for Amazon Kinesis streams.
This commit is contained in:
Sahil Palvia 2018-02-06 07:35:44 -08:00 committed by Justin Pfifer
parent 59d40251c7
commit 3ae916c5fc
15 changed files with 818 additions and 394 deletions

275
CHANGELOG.md Normal file
View file

@ -0,0 +1,275 @@
# Changelog
## Release 1.8.10
* Allow providing a custom IKinesisProxy implementation.
* [PR #274](https://github.com/awslabs/amazon-kinesis-client/pull/274)
* Checkpointing on a different thread should no longer emit a warning about NullMetricsScope.
* [PR #284](https://github.com/awslabs/amazon-kinesis-client/pull/284)
* [Issue #48](https://github.com/awslabs/amazon-kinesis-client/issues/48)
* Upgraded the AWS Java SDK to version 1.11.271
* [PR #287](https://github.com/awslabs/amazon-kinesis-client/pull/287)
## Release 1.8.9
* Allow disabling check for the case where a child shard has an open parent shard.
There is a race condition where it's possible for the a parent shard to appear open, while having child shards. This check can now be disabled by setting [`ignoreUnexpectedChildShards`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1037) to true.
* [PR #240](https://github.com/awslabs/amazon-kinesis-client/pull/240)
* [Issue #210](https://github.com/awslabs/amazon-kinesis-client/issues/210)
* Upgraded the AWS SDK for Java to 1.11.261
* [PR #281](https://github.com/awslabs/amazon-kinesis-client/pull/281)
## Release 1.8.8
* Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`.
PrefetchGetRecordsCache will request for a new iterator and start fetching data again.
* [PR#263](https://github.com/awslabs/amazon-kinesis-client/pull/263)
* Added warning message for long running tasks.
Logging long running tasks can be enabled by setting the following configuration property:
| Name | Default | Description |
| ---- | ------- | ----------- |
| [`logWarningForTaskAfterMillis`](https://github.com/awslabs/amazon-kinesis-client/blob/3de901ea9327370ed732af86c4d4999c8d99541c/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1367) | Not set | Milliseconds after which the logger will log a warning message for the long running task |
* [PR#259](https://github.com/awslabs/amazon-kinesis-client/pull/259)
* Handling spurious lease renewal failures gracefully.
Added better handling of DynamoDB failures when updating leases. These failures would occur when a request to DynamoDB appeared to fail, but was actually successful.
* [PR#247](https://github.com/awslabs/amazon-kinesis-client/pull/247)
* ShutdownTask gets retried if the previous attempt on the ShutdownTask fails.
* [PR#267](https://github.com/awslabs/amazon-kinesis-client/pull/267)
* Fix for using maxRecords from `KinesisClientLibConfiguration` in `GetRecordsCache` for fetching records.
* [PR#264](https://github.com/awslabs/amazon-kinesis-client/pull/264)
## Release 1.8.7
* Don't add a delay for synchronous requests to Kinesis
Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis.
* [PR #256](https://github.com/awslabs/amazon-kinesis-client/pull/256)
## Release 1.8.6
* Add prefetching of records from Kinesis
Prefetching will retrieve and queue additional records from Kinesis while the application is processing existing records.
Prefetching can be enabled by setting [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) to `PREFETCH_CACHED`. Once enabled an additional fetching thread will be started to retrieve records from Kinesis. Retrieved records will be held in a queue until the application is ready to process them.
Pre-fetching supports the following configuration values:
| Name | Default | Description |
| ---- | ------- | ----------- |
| [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) | `DEFAULT` | Which data fetching strategy to use |
| [`maxPendingProcessRecordsInput`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1296) | 3 | The maximum number of process records input that can be queued |
| [`maxCacheByteSize`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1307) | 8 MiB | The maximum number of bytes that can be queued |
| [`maxRecordsCount`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1326) | 30,000 | The maximum number of records that can be queued |
| [`idleMillisBetweenCalls`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1353) | 1,500 ms | The amount of time to wait between calls to Kinesis |
* [PR #246](https://github.com/awslabs/amazon-kinesis-client/pull/246)
## Release 1.8.5 (September 26, 2017)
* Only advance the shard iterator for the accepted response.
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
* [PR #230](https://github.com/awslabs/amazon-kinesis-client/pull/230)
* [Issue #231](https://github.com/awslabs/amazon-kinesis-client/issues/231)
## Release 1.8.4 (September 22, 2017)
* Create a new completion service for each request.
This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.
* [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227)
* [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226)
## Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.
The asynchronous retriever is only used when [`KinesisClientLibConfiguration#retryGetRecordsInSeconds`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L227), and [`KinesisClientLibConfiguration#maxGetRecordsThreadPool`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L230) are set.
* [PR #222](https://github.com/awslabs/amazon-kinesis-client/pull/222)
## Release 1.8.2 (September 20, 2017)
* Add support for two phase checkpoints
Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint.
Should the checkpoint fail the attempted sequence number is provided in the [`InitializationInput#getPendingCheckpointSequenceNumber`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java#L81) otherwise the value will be null.
* [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188)
* Support timeouts, and retry for GetRecords calls.
Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests.
* [PR #214](https://github.com/awslabs/amazon-kinesis-client/pull/214)
* Notification when the lease table is throttled
When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays.
* [PR #212](https://github.com/awslabs/amazon-kinesis-client/pull/212)
* Support configuring the graceful shutdown timeout for MultiLang Clients
This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait.
* [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204)
## Release 1.8.1 (August 2, 2017)
* Support timeouts for calls to the MultiLang Daemon
This adds support for setting a timeout when dispatching records to the client record processor. If the record processor doesn't respond within the timeout the parent Java process will be terminated. This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
The timeout for the this can be set by adding `timeoutInSeconds = <timeout value>`. The default for this is no timeout.
__Setting this can cause the KCL to exit suddenly, before using this ensure that you have an automated restart for your application__
* [PR #195](https://github.com/awslabs/amazon-kinesis-client/pull/195)
* [Issue #185](https://github.com/awslabs/amazon-kinesis-client/issues/185)
## Release 1.8.0 (July 25, 2017)
* Execute graceful shutdown on its own thread
* [PR #191](https://github.com/awslabs/amazon-kinesis-client/pull/191)
* [Issue #167](https://github.com/awslabs/amazon-kinesis-client/issues/167)
* Added support for controlling the size of the lease renewer thread pool
* [PR #177](https://github.com/awslabs/amazon-kinesis-client/pull/177)
* [Issue #171](https://github.com/awslabs/amazon-kinesis-client/issues/171)
* Require Java 8 and later
__Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.__
* [PR #176](https://github.com/awslabs/amazon-kinesis-client/issues/176)
## Release 1.7.6 (June 21, 2017)
* Added support for graceful shutdown in MultiLang Clients
* [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174)
* [PR #182](https://github.com/awslabs/amazon-kinesis-client/pull/182)
* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis`
* [PR #170](https://github.com/awslabs/amazon-kinesis-client/pull/170)
* Updated to version 1.11.151 of the AWS Java SDK
* [PR #183](https://github.com/awslabs/amazon-kinesis-client/pull/183)
## Release 1.7.5 (April 7, 2017)
* Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls.
* [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152)
* Upgrade to version 1.11.115 of the AWS Java SDK
* [PR #155](https://github.com/awslabs/amazon-kinesis-client/pull/155)
## Release 1.7.4 (February 27, 2017)
* Fixed an issue building JavaDoc for Java 8.
* [Issue #18](https://github.com/awslabs/amazon-kinesis-client/issues/18)
* [PR #141](https://github.com/awslabs/amazon-kinesis-client/pull/141)
* Reduce Throttling Messages to WARN, unless throttling occurs 6 times consecutively.
* [Issue #4](https://github.com/awslabs/amazon-kinesis-client/issues/4)
* [PR #140](https://github.com/awslabs/amazon-kinesis-client/pull/140)
* Fixed two bugs occurring in requestShutdown.
* Fixed a bug that prevented the worker from shutting down, via requestShutdown, when no leases were held.
* [Issue #128](https://github.com/awslabs/amazon-kinesis-client/issues/128)
* Fixed a bug that could trigger a NullPointerException if leases changed during requestShutdown.
* [Issue #129](https://github.com/awslabs/amazon-kinesis-client/issues/129)
* [PR #139](https://github.com/awslabs/amazon-kinesis-client/pull/139)
* Upgraded the AWS SDK Version to 1.11.91
* [PR #138](https://github.com/awslabs/amazon-kinesis-client/pull/138)
* Use an executor returned from `ExecutorService.newFixedThreadPool` instead of constructing it by hand.
* [PR #135](https://github.com/awslabs/amazon-kinesis-client/pull/135)
* Correctly initialize DynamoDB client, when endpoint is explicitly set.
* [PR #142](https://github.com/awslabs/amazon-kinesis-client/pull/142)
## Release 1.7.3 (January 9, 2017)
* Upgrade to the newest AWS Java SDK.
* [Amazon Kinesis Client Issue #27](https://github.com/awslabs/amazon-kinesis-client-python/issues/27)
* [PR #126](https://github.com/awslabs/amazon-kinesis-client/pull/126)
* [PR #125](https://github.com/awslabs/amazon-kinesis-client/pull/125)
* Added a direct dependency on commons-logging.
* [Issue #123](https://github.com/awslabs/amazon-kinesis-client/issues/123)
* [PR #124](https://github.com/awslabs/amazon-kinesis-client/pull/124)
* Make ShardInfo public to allow for custom ShardPrioritization strategies.
* [Issue #120](https://github.com/awslabs/amazon-kinesis-client/issues/120)
* [PR #127](https://github.com/awslabs/amazon-kinesis-client/pull/127)
## Release 1.7.2 (November 7, 2016)
* MultiLangDaemon Feature Updates
The MultiLangDaemon has been upgraded to use the v2 interfaces, which allows access to enhanced checkpointing, and more information during record processor initialization. The MultiLangDaemon clients must be updated before they can take advantage of these new features.
## Release 1.7.1 (November 3, 2016)
* General
* Allow disabling shard synchronization at startup.
* Applications can disable shard synchronization at startup. Disabling shard synchronization can application startup times for very large streams.
* [PR #102](https://github.com/awslabs/amazon-kinesis-client/pull/102)
* Applications can now request a graceful shutdown, and record processors that implement the IShutdownNotificationAware will be given a chance to checkpoint before being shutdown.
* This adds a [new interface](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java), and a [new method on Worker](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java#L539).
* [PR #109](https://github.com/awslabs/amazon-kinesis-client/pull/109)
* Solves [Issue #79](https://github.com/awslabs/amazon-kinesis-client/issues/79)
* MultiLangDaemon
* Applications can now use credential provides that accept string parameters.
* [PR #99](https://github.com/awslabs/amazon-kinesis-client/pull/99)
* Applications can now use different credentials for each service.
* [PR #111](https://github.com/awslabs/amazon-kinesis-client/pull/111)
## Release 1.7.0 (August 22, 2016)
* Add support for time based iterators ([See GetShardIterator Documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html))
* [PR #94](https://github.com/awslabs/amazon-kinesis-client/pull/94)
The `KinesisClientLibConfiguration` now supports providing an initial time stamp position.
* This position is only used if there is no current checkpoint for the shard.
* This setting cannot be used with DynamoDB Streams
Resolves [Issue #88](https://github.com/awslabs/amazon-kinesis-client/issues/88)
* Allow Prioritization of Parent Shards for Task Assignment
* [PR #95](https://github.com/awslabs/amazon-kinesis-client/pull/95)
The `KinesisClientLibconfiguration` now supports providing a `ShardPrioritization` strategy. This strategy controls how the `Worker` determines which `ShardConsumer` to call next. This can improve processing for streams that split often, such as DynamoDB Streams.
* Remove direct dependency on `aws-java-sdk-core`, to allow independent versioning.
* [PR #92](https://github.com/awslabs/amazon-kinesis-client/pull/92)
**You may need to add a direct dependency on aws-java-sdk-core if other dependencies include an older version.**
## Release 1.6.5 (July 25, 2016)
* Change LeaseManager to call DescribeTable before attempting to create the lease table.
* [Issue #36](https://github.com/awslabs/amazon-kinesis-client/issues/36)
* [PR #41](https://github.com/awslabs/amazon-kinesis-client/pull/41)
* [PR #67](https://github.com/awslabs/amazon-kinesis-client/pull/67)
* Allow DynamoDB lease table name to be specified
* [PR #61](https://github.com/awslabs/amazon-kinesis-client/pull/61)
* Add approximateArrivalTimestamp for JsonFriendlyRecord
* [PR #86](https://github.com/awslabs/amazon-kinesis-client/pull/86)
* Shutdown lease renewal thread pool on exit.
* [PR #84](https://github.com/awslabs/amazon-kinesis-client/pull/84)
* Wait for CloudWatch publishing thread to finish before exiting.
* [PR #82](https://github.com/awslabs/amazon-kinesis-client/pull/82)
* Added unit, and integration tests for the library.
## Release 1.6.4 (July 6, 2016)
* Upgrade to AWS SDK for Java 1.11.14
* [Issue #74](https://github.com/awslabs/amazon-kinesis-client/issues/74)
* [Issue #73](https://github.com/awslabs/amazon-kinesis-client/issues/73)
* **Maven Artifact Signing Change**
* Artifacts are now signed by the identity `Amazon Kinesis Tools <amazon-kinesis-tools@amazon.com>`
## Release 1.6.3 (May 12, 2016)
* Fix format exception caused by DEBUG log in LeaseTaker [Issue # 68](https://github.com/awslabs/amazon-kinesis-client/issues/68)
## Release 1.6.2 (March 23, 2016)
* Support for specifying max leases per worker and max leases to steal at a time.
* Support for specifying initial DynamoDB table read and write capacity.
* Support for parallel lease renewal.
* Support for graceful worker shutdown.
* Change DefaultCWMetricsPublisher log level to debug. [PR # 49](https://github.com/awslabs/amazon-kinesis-client/pull/49)
* Avoid NPE in MLD record processor shutdown if record processor was not initialized. [Issue # 29](https://github.com/awslabs/amazon-kinesis-client/issues/29)
## Release 1.6.1 (September 23, 2015)
* Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call.
## Release 1.6.0 (July 31, 2015)
* Restores compatibility with [dynamodb-streams-kinesis-adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) (which was broken in 1.4.0).
## Release 1.5.1 (July 20, 2015)
* KCL maven artifact 1.5.0 does not work with JDK 7. This release addresses this issue.
## Release 1.5.0 (July 9, 2015)
* **[Metrics Enhancements][kinesis-guide-monitoring-with-kcl]**
* Support metrics level and dimension configurations to control CloudWatch metrics emitted by the KCL.
* Add new metrics that track time spent in record processor methods.
* Disable WorkerIdentifier dimension by default.
* **Exception Reporting** &mdash; Do not silently ignore exceptions in ShardConsumer.
* **AWS SDK Component Dependencies** &mdash; Depend only on AWS SDK components that are used.
## Release 1.4.0 (June 2, 2015)
* Integration with the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**
* Automatically de-aggregate records put into the Kinesis stream using the KPL.
* Support checkpointing at the individual user record level when multiple user records are aggregated into one Kinesis record using the KPL.
See [Consumer De-aggregation with the KCL][kinesis-guide-consumer-deaggregation] for details.
## Release 1.3.0 (May 22, 2015)
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
## Release 1.2.1 (January 26, 2015)
* **MultiLangDaemon** &mdash; Changes to the MultiLangDaemon to make it easier to provide a custom worker.
## Release 1.2 (October 21, 2014)
* **Multi-Language Support** &mdash; Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
## Release 1.1 (June 30, 2014)
* **Checkpointing at a specific sequence number** &mdash; The IRecordProcessorCheckpointer interface now supports checkpointing at a sequence number specified by the record processor.
* **Set region** &mdash; KinesisClientLibConfiguration now supports setting the region name to indicate the location of the Amazon Kinesis service. The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting.
[kinesis]: http://aws.amazon.com/kinesis
[kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169
[kinesis-client-library-issues]: https://github.com/awslabs/amazon-kinesis-client/issues
[docs-signup]: http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html
[kinesis-guide]: http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html
[kinesis-guide-begin]: http://docs.aws.amazon.com/kinesis/latest/dev/before-you-begin.html
[kinesis-guide-create]: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
[kinesis-guide-applications]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html
[kinesis-guide-monitoring-with-kcl]: http://docs.aws.amazon.com//kinesis/latest/dev/monitoring-with-kcl.html
[kinesis-guide-kpl]: http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html
[kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.10 Bundle-Version: 1.9.0
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Bundle-RequiredExecutionEnvironment: JavaSE-1.8
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

263
README.md
View file

@ -29,266 +29,7 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes ## Release Notes
For release notes check **[CHANGELOG.md][changelog-md]**.
### Release 1.8.10
* Allow providing a custom IKinesisProxy implementation.
* [PR #274](https://github.com/awslabs/amazon-kinesis-client/pull/274)
* Checkpointing on a different thread should no longer emit a warning about NullMetricsScope.
* [PR #284](https://github.com/awslabs/amazon-kinesis-client/pull/284)
* [Issue #48](https://github.com/awslabs/amazon-kinesis-client/issues/48)
* Upgraded the AWS Java SDK to version 1.11.271
* [PR #287](https://github.com/awslabs/amazon-kinesis-client/pull/287)
### Release 1.8.9
* Allow disabling check for the case where a child shard has an open parent shard.
There is a race condition where it's possible for the a parent shard to appear open, while having child shards. This check can now be disabled by setting [`ignoreUnexpectedChildShards`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1037) to true.
* [PR #240](https://github.com/awslabs/amazon-kinesis-client/pull/240)
* [Issue #210](https://github.com/awslabs/amazon-kinesis-client/issues/210)
* Upgraded the AWS SDK for Java to 1.11.261
* [PR #281](https://github.com/awslabs/amazon-kinesis-client/pull/281)
### Release 1.8.8
* Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`.
PrefetchGetRecordsCache will request for a new iterator and start fetching data again.
* [PR#263](https://github.com/awslabs/amazon-kinesis-client/pull/263)
* Added warning message for long running tasks.
Logging long running tasks can be enabled by setting the following configuration property:
| Name | Default | Description |
| ---- | ------- | ----------- |
| [`logWarningForTaskAfterMillis`](https://github.com/awslabs/amazon-kinesis-client/blob/3de901ea9327370ed732af86c4d4999c8d99541c/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1367) | Not set | Milliseconds after which the logger will log a warning message for the long running task |
* [PR#259](https://github.com/awslabs/amazon-kinesis-client/pull/259)
* Handling spurious lease renewal failures gracefully.
Added better handling of DynamoDB failures when updating leases. These failures would occur when a request to DynamoDB appeared to fail, but was actually successful.
* [PR#247](https://github.com/awslabs/amazon-kinesis-client/pull/247)
* ShutdownTask gets retried if the previous attempt on the ShutdownTask fails.
* [PR#267](https://github.com/awslabs/amazon-kinesis-client/pull/267)
* Fix for using maxRecords from `KinesisClientLibConfiguration` in `GetRecordsCache` for fetching records.
* [PR#264](https://github.com/awslabs/amazon-kinesis-client/pull/264)
### Release 1.8.7
* Don't add a delay for synchronous requests to Kinesis
Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis.
* [PR #256](https://github.com/awslabs/amazon-kinesis-client/pull/256)
### Release 1.8.6
* Add prefetching of records from Kinesis
Prefetching will retrieve and queue additional records from Kinesis while the application is processing existing records.
Prefetching can be enabled by setting [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) to `PREFETCH_CACHED`. Once enabled an additional fetching thread will be started to retrieve records from Kinesis. Retrieved records will be held in a queue until the application is ready to process them.
Pre-fetching supports the following configuration values:
| Name | Default | Description |
| ---- | ------- | ----------- |
| [`dataFetchingStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1317) | `DEFAULT` | Which data fetching strategy to use |
| [`maxPendingProcessRecordsInput`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1296) | 3 | The maximum number of process records input that can be queued |
| [`maxCacheByteSize`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1307) | 8 MiB | The maximum number of bytes that can be queued |
| [`maxRecordsCount`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1326) | 30,000 | The maximum number of records that can be queued |
| [`idleMillisBetweenCalls`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1353) | 1,500 ms | The amount of time to wait between calls to Kinesis |
* [PR #246](https://github.com/awslabs/amazon-kinesis-client/pull/246)
### Release 1.8.5 (September 26, 2017)
* Only advance the shard iterator for the accepted response.
This fixes a race condition in the `KinesisDataFetcher` when it's being used to make asynchronous requests. The shard iterator is now only advanced when the retriever calls `DataFetcherResult#accept()`.
* [PR #230](https://github.com/awslabs/amazon-kinesis-client/pull/230)
* [Issue #231](https://github.com/awslabs/amazon-kinesis-client/issues/231)
### Release 1.8.4 (September 22, 2017)
* Create a new completion service for each request.
This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.
* [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227)
* [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226)
### Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.
The asynchronous retriever is only used when [`KinesisClientLibConfiguration#retryGetRecordsInSeconds`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L227), and [`KinesisClientLibConfiguration#maxGetRecordsThreadPool`](https://github.com/awslabs/amazon-kinesis-client/blob/01d2688bc6e68fd3fe5cb698cb65299d67ac930d/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L230) are set.
* [PR #222](https://github.com/awslabs/amazon-kinesis-client/pull/222)
### Release 1.8.2 (September 20, 2017)
* Add support for two phase checkpoints
Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint.
Should the checkpoint fail the attempted sequence number is provided in the [`InitializationInput#getPendingCheckpointSequenceNumber`](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/types/InitializationInput.java#L81) otherwise the value will be null.
* [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188)
* Support timeouts, and retry for GetRecords calls.
Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests.
* [PR #214](https://github.com/awslabs/amazon-kinesis-client/pull/214)
* Notification when the lease table is throttled
When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays.
* [PR #212](https://github.com/awslabs/amazon-kinesis-client/pull/212)
* Support configuring the graceful shutdown timeout for MultiLang Clients
This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait.
* [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204)
### Release 1.8.1 (August 2, 2017)
* Support timeouts for calls to the MultiLang Daemon
This adds support for setting a timeout when dispatching records to the client record processor. If the record processor doesn't respond within the timeout the parent Java process will be terminated. This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
The timeout for the this can be set by adding `timeoutInSeconds = <timeout value>`. The default for this is no timeout.
__Setting this can cause the KCL to exit suddenly, before using this ensure that you have an automated restart for your application__
* [PR #195](https://github.com/awslabs/amazon-kinesis-client/pull/195)
* [Issue #185](https://github.com/awslabs/amazon-kinesis-client/issues/185)
### Release 1.8.0 (July 25, 2017)
* Execute graceful shutdown on its own thread
* [PR #191](https://github.com/awslabs/amazon-kinesis-client/pull/191)
* [Issue #167](https://github.com/awslabs/amazon-kinesis-client/issues/167)
* Added support for controlling the size of the lease renewer thread pool
* [PR #177](https://github.com/awslabs/amazon-kinesis-client/pull/177)
* [Issue #171](https://github.com/awslabs/amazon-kinesis-client/issues/171)
* Require Java 8 and later
__Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.__
* [PR #176](https://github.com/awslabs/amazon-kinesis-client/issues/176)
### Release 1.7.6 (June 21, 2017)
* Added support for graceful shutdown in MultiLang Clients
* [PR #174](https://github.com/awslabs/amazon-kinesis-client/pull/174)
* [PR #182](https://github.com/awslabs/amazon-kinesis-client/pull/182)
* Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis`
* [PR #170](https://github.com/awslabs/amazon-kinesis-client/pull/170)
* Updated to version 1.11.151 of the AWS Java SDK
* [PR #183](https://github.com/awslabs/amazon-kinesis-client/pull/183)
### Release 1.7.5 (April 7, 2017)
* Correctly handle throttling for DescribeStream, and save accumulated progress from individual calls.
* [PR #152](https://github.com/awslabs/amazon-kinesis-client/pull/152)
* Upgrade to version 1.11.115 of the AWS Java SDK
* [PR #155](https://github.com/awslabs/amazon-kinesis-client/pull/155)
### Release 1.7.4 (February 27, 2017)
* Fixed an issue building JavaDoc for Java 8.
* [Issue #18](https://github.com/awslabs/amazon-kinesis-client/issues/18)
* [PR #141](https://github.com/awslabs/amazon-kinesis-client/pull/141)
* Reduce Throttling Messages to WARN, unless throttling occurs 6 times consecutively.
* [Issue #4](https://github.com/awslabs/amazon-kinesis-client/issues/4)
* [PR #140](https://github.com/awslabs/amazon-kinesis-client/pull/140)
* Fixed two bugs occurring in requestShutdown.
* Fixed a bug that prevented the worker from shutting down, via requestShutdown, when no leases were held.
* [Issue #128](https://github.com/awslabs/amazon-kinesis-client/issues/128)
* Fixed a bug that could trigger a NullPointerException if leases changed during requestShutdown.
* [Issue #129](https://github.com/awslabs/amazon-kinesis-client/issues/129)
* [PR #139](https://github.com/awslabs/amazon-kinesis-client/pull/139)
* Upgraded the AWS SDK Version to 1.11.91
* [PR #138](https://github.com/awslabs/amazon-kinesis-client/pull/138)
* Use an executor returned from `ExecutorService.newFixedThreadPool` instead of constructing it by hand.
* [PR #135](https://github.com/awslabs/amazon-kinesis-client/pull/135)
* Correctly initialize DynamoDB client, when endpoint is explicitly set.
* [PR #142](https://github.com/awslabs/amazon-kinesis-client/pull/142)
### Release 1.7.3 (January 9, 2017)
* Upgrade to the newest AWS Java SDK.
* [Amazon Kinesis Client Issue #27](https://github.com/awslabs/amazon-kinesis-client-python/issues/27)
* [PR #126](https://github.com/awslabs/amazon-kinesis-client/pull/126)
* [PR #125](https://github.com/awslabs/amazon-kinesis-client/pull/125)
* Added a direct dependency on commons-logging.
* [Issue #123](https://github.com/awslabs/amazon-kinesis-client/issues/123)
* [PR #124](https://github.com/awslabs/amazon-kinesis-client/pull/124)
* Make ShardInfo public to allow for custom ShardPrioritization strategies.
* [Issue #120](https://github.com/awslabs/amazon-kinesis-client/issues/120)
* [PR #127](https://github.com/awslabs/amazon-kinesis-client/pull/127)
### Release 1.7.2 (November 7, 2016)
* MultiLangDaemon Feature Updates
The MultiLangDaemon has been upgraded to use the v2 interfaces, which allows access to enhanced checkpointing, and more information during record processor initialization. The MultiLangDaemon clients must be updated before they can take advantage of these new features.
### Release 1.7.1 (November 3, 2016)
* General
* Allow disabling shard synchronization at startup.
* Applications can disable shard synchronization at startup. Disabling shard synchronization can application startup times for very large streams.
* [PR #102](https://github.com/awslabs/amazon-kinesis-client/pull/102)
* Applications can now request a graceful shutdown, and record processors that implement the IShutdownNotificationAware will be given a chance to checkpoint before being shutdown.
* This adds a [new interface](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/v2/IShutdownNotificationAware.java), and a [new method on Worker](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java#L539).
* [PR #109](https://github.com/awslabs/amazon-kinesis-client/pull/109)
* Solves [Issue #79](https://github.com/awslabs/amazon-kinesis-client/issues/79)
* MultiLangDaemon
* Applications can now use credential provides that accept string parameters.
* [PR #99](https://github.com/awslabs/amazon-kinesis-client/pull/99)
* Applications can now use different credentials for each service.
* [PR #111](https://github.com/awslabs/amazon-kinesis-client/pull/111)
### Release 1.7.0 (August 22, 2016)
* Add support for time based iterators ([See GetShardIterator Documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html))
* [PR #94](https://github.com/awslabs/amazon-kinesis-client/pull/94)
The `KinesisClientLibConfiguration` now supports providing an initial time stamp position.
* This position is only used if there is no current checkpoint for the shard.
* This setting cannot be used with DynamoDB Streams
Resolves [Issue #88](https://github.com/awslabs/amazon-kinesis-client/issues/88)
* Allow Prioritization of Parent Shards for Task Assignment
* [PR #95](https://github.com/awslabs/amazon-kinesis-client/pull/95)
The `KinesisClientLibconfiguration` now supports providing a `ShardPrioritization` strategy. This strategy controls how the `Worker` determines which `ShardConsumer` to call next. This can improve processing for streams that split often, such as DynamoDB Streams.
* Remove direct dependency on `aws-java-sdk-core`, to allow independent versioning.
* [PR #92](https://github.com/awslabs/amazon-kinesis-client/pull/92)
**You may need to add a direct dependency on aws-java-sdk-core if other dependencies include an older version.**
### Release 1.6.5 (July 25, 2016)
* Change LeaseManager to call DescribeTable before attempting to create the lease table.
* [Issue #36](https://github.com/awslabs/amazon-kinesis-client/issues/36)
* [PR #41](https://github.com/awslabs/amazon-kinesis-client/pull/41)
* [PR #67](https://github.com/awslabs/amazon-kinesis-client/pull/67)
* Allow DynamoDB lease table name to be specified
* [PR #61](https://github.com/awslabs/amazon-kinesis-client/pull/61)
* Add approximateArrivalTimestamp for JsonFriendlyRecord
* [PR #86](https://github.com/awslabs/amazon-kinesis-client/pull/86)
* Shutdown lease renewal thread pool on exit.
* [PR #84](https://github.com/awslabs/amazon-kinesis-client/pull/84)
* Wait for CloudWatch publishing thread to finish before exiting.
* [PR #82](https://github.com/awslabs/amazon-kinesis-client/pull/82)
* Added unit, and integration tests for the library.
### Release 1.6.4 (July 6, 2016)
* Upgrade to AWS SDK for Java 1.11.14
* [Issue #74](https://github.com/awslabs/amazon-kinesis-client/issues/74)
* [Issue #73](https://github.com/awslabs/amazon-kinesis-client/issues/73)
* **Maven Artifact Signing Change**
* Artifacts are now signed by the identity `Amazon Kinesis Tools <amazon-kinesis-tools@amazon.com>`
### Release 1.6.3 (May 12, 2016)
* Fix format exception caused by DEBUG log in LeaseTaker [Issue # 68](https://github.com/awslabs/amazon-kinesis-client/issues/68)
### Release 1.6.2 (March 23, 2016)
* Support for specifying max leases per worker and max leases to steal at a time.
* Support for specifying initial DynamoDB table read and write capacity.
* Support for parallel lease renewal.
* Support for graceful worker shutdown.
* Change DefaultCWMetricsPublisher log level to debug. [PR # 49](https://github.com/awslabs/amazon-kinesis-client/pull/49)
* Avoid NPE in MLD record processor shutdown if record processor was not initialized. [Issue # 29](https://github.com/awslabs/amazon-kinesis-client/issues/29)
### Release 1.6.1 (September 23, 2015)
* Expose [approximateArrivalTimestamp](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) for Records in processRecords API call.
### Release 1.6.0 (July 31, 2015)
* Restores compatibility with [dynamodb-streams-kinesis-adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) (which was broken in 1.4.0).
### Release 1.5.1 (July 20, 2015)
* KCL maven artifact 1.5.0 does not work with JDK 7. This release addresses this issue.
### Release 1.5.0 (July 9, 2015)
* **[Metrics Enhancements][kinesis-guide-monitoring-with-kcl]**
* Support metrics level and dimension configurations to control CloudWatch metrics emitted by the KCL.
* Add new metrics that track time spent in record processor methods.
* Disable WorkerIdentifier dimension by default.
* **Exception Reporting** &mdash; Do not silently ignore exceptions in ShardConsumer.
* **AWS SDK Component Dependencies** &mdash; Depend only on AWS SDK components that are used.
### Release 1.4.0 (June 2, 2015)
* Integration with the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**
* Automatically de-aggregate records put into the Kinesis stream using the KPL.
* Support checkpointing at the individual user record level when multiple user records are aggregated into one Kinesis record using the KPL.
See [Consumer De-aggregation with the KCL][kinesis-guide-consumer-deaggregation] for details.
### Release 1.3.0 (May 22, 2015)
* A new metric called "MillisBehindLatest", which tracks how far consumers are from real time, is now uploaded to CloudWatch.
### Release 1.2.1 (January 26, 2015)
* **MultiLangDaemon** &mdash; Changes to the MultiLangDaemon to make it easier to provide a custom worker.
### Release 1.2 (October 21, 2014)
* **Multi-Language Support** &mdash; Amazon KCL now supports implementing record processors in any language by communicating with the daemon over [STDIN and STDOUT][multi-lang-protocol]. Python developers can directly use the [Amazon Kinesis Client Library for Python][kclpy] to write their data processing applications.
### Release 1.1 (June 30, 2014)
* **Checkpointing at a specific sequence number** &mdash; The IRecordProcessorCheckpointer interface now supports checkpointing at a sequence number specified by the record processor.
* **Set region** &mdash; KinesisClientLibConfiguration now supports setting the region name to indicate the location of the Amazon Kinesis service. The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting.
[kinesis]: http://aws.amazon.com/kinesis [kinesis]: http://aws.amazon.com/kinesis
[kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169 [kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169
@ -303,4 +44,4 @@ To make it easier for developers to write record processors in other languages,
[kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html [kinesis-guide-consumer-deaggregation]: http://docs.aws.amazon.com//kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html
[kclpy]: https://github.com/awslabs/amazon-kinesis-client-python [kclpy]: https://github.com/awslabs/amazon-kinesis-client-python
[multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java [multi-lang-protocol]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
[changelog-md]: https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.8.10</version> <version>1.9.0-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>
@ -25,7 +25,7 @@
</licenses> </licenses>
<properties> <properties>
<aws-java-sdk.version>1.11.271</aws-java-sdk.version> <aws-java-sdk.version>1.11.272</aws-java-sdk.version>
<sqlite4java.version>1.0.392</sqlite4java.version> <sqlite4java.version>1.0.392</sqlite4java.version>
<sqlite4java.native>libsqlite4java</sqlite4java.native> <sqlite4java.native>libsqlite4java</sqlite4java.native>
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath> <sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.10"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.9.0";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
@ -182,6 +182,16 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
/**
* The sleep time between two listShards calls from the proxy when throttled.
*/
public static final long DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS = 1500;
/**
* The number of times the Proxy will retry listShards call when throttled.
*/
public static final int DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS = 50;
private String applicationName; private String applicationName;
private String tableName; private String tableName;
private String streamName; private String streamName;
@ -238,6 +248,12 @@ public class KinesisClientLibConfiguration {
@Getter @Getter
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty(); private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
@Getter
private long listShardsBackoffTimeInMillis = DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS;
@Getter
private int maxListShardsRetryAttempts = DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS;
/** /**
* Constructor. * Constructor.
@ -1387,4 +1403,26 @@ public class KinesisClientLibConfiguration {
this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis); this.logWarningForTaskAfterMillis = Optional.of(logWarningForTaskAfterMillis);
return this; return this;
} }
/**
* @param listShardsBackoffTimeInMillis Max sleep between two listShards call when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return
*/
public KinesisClientLibConfiguration withListShardsBackoffTimeInMillis(long listShardsBackoffTimeInMillis) {
checkIsValuePositive("listShardsBackoffTimeInMillis", listShardsBackoffTimeInMillis);
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
return this;
}
/**
* @param maxListShardsRetryAttempts Max number of retries for listShards when throttled
* in {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* @return
*/
public KinesisClientLibConfiguration withMaxListShardsRetryAttempts(int maxListShardsRetryAttempts) {
checkIsValuePositive("maxListShardsRetryAttempts", maxListShardsRetryAttempts);
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
return this;
}
} }

View file

@ -48,7 +48,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
@ -249,8 +249,7 @@ public class Worker implements Runnable {
this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), this(config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
config, config,
new StreamConfig( new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) new KinesisProxy(config, kinesisClient),
.getProxy(config.getStreamName()),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(), config.shouldValidateSequenceNumberBeforeCheckpointing(),
@ -1278,8 +1277,7 @@ public class Worker implements Runnable {
shardPrioritization = new ParentsFirstShardPrioritization(1); shardPrioritization = new ParentsFirstShardPrioritization(1);
} }
if (kinesisProxy == null) { if (kinesisProxy == null) {
kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) kinesisProxy = new KinesisProxy(config, kinesisClient);
.getProxy(config.getStreamName());
} }
return new Worker(config.getApplicationName(), return new Worker(config.getApplicationName(),

View file

@ -48,10 +48,17 @@ public interface IKinesisProxy {
/** /**
* Fetch information about stream. Useful for fetching the list of shards in a stream. * Fetch information about stream. Useful for fetching the list of shards in a stream.
* *
* @deprecated Going forward this method is
* being deprecated. This method uses DescribeStream call, which is throttled at 10 calls per account by default.
* If possible try to use ListShards call available in the client, or use the getShardList or getAllShards to get
* shard info. To make DescribeStream calls, use the AmazonKinesis client directly instead of using KinesisProxy.
* This method will be removed in the next major/minor release.
*
* @param startShardId exclusive start shardId - used when paginating the list of shards. * @param startShardId exclusive start shardId - used when paginating the list of shards.
* @return DescribeStreamOutput object containing a description of the stream. * @return DescribeStreamOutput object containing a description of the stream.
* @throws ResourceNotFoundException The Kinesis stream was not found * @throws ResourceNotFoundException The Kinesis stream was not found
*/ */
@Deprecated
DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException; DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException;
/** /**

View file

@ -17,7 +17,11 @@ package com.amazonaws.services.kinesis.clientlibrary.proxies;
/** /**
* Interface for a KinesisProxyFactory. * Interface for a KinesisProxyFactory.
* *
* @deprecated Deprecating since KinesisProxy is just created once, there is no use of a factory. There is no
* replacement for this class. This class will be removed in the next major/minor release.
*
*/ */
@Deprecated
public interface IKinesisProxyFactory { public interface IKinesisProxyFactory {
/** /**

View file

@ -23,13 +23,14 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import lombok.Data; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@ -39,13 +40,18 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException; import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult; import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.StreamStatus;
import lombok.Data;
/** /**
* Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards). * Kinesis proxy - used to make calls to Amazon Kinesis (e.g. fetch data records and list of shards).
*/ */
@ -70,51 +76,24 @@ public class KinesisProxy implements IKinesisProxyExtended {
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
private final long describeStreamBackoffTimeInMillis; private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts; private final int maxDescribeStreamRetryAttempts;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
private boolean isKinesisClient = true;
/** /**
* Public constructor. * @deprecated We expect the client to be passed to the proxy, and the proxy will not require to create it.
* *
* @param streamName Data records will be fetched from this stream * @param credentialProvider
* @param credentialProvider Provides credentials for signing Kinesis requests * @param endpoint
* @param endpoint Kinesis endpoint * @param serviceName
* @param regionId
* @return
*/ */
@Deprecated
public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) {
this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES);
}
/**
* Public constructor.
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
* @param endpoint Kinesis endpoint
* @param serviceName service name
* @param regionId region id
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/
public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider,
String endpoint,
String serviceName,
String regionId,
long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) {
this(streamName, credentialProvider, buildClientSettingEndpoint(credentialProvider,
endpoint,
serviceName,
regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts);
LOG.debug("KinesisProxy has created a kinesisClient");
}
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
String endpoint, String endpoint,
String serviceName, String serviceName,
String regionId) { String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider); AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
client.setEndpoint(endpoint); client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId); client.setSignerRegionOverride(regionId);
@ -124,24 +103,123 @@ public class KinesisProxy implements IKinesisProxyExtended {
/** /**
* Public constructor. * Public constructor.
* *
* @deprecated Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. Will be removed in the
* next major/minor release.
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
* @param endpoint Kinesis endpoint
*/
@Deprecated
public KinesisProxy(final String streamName, AWSCredentialsProvider credentialProvider, String endpoint) {
this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
}
/**
* Public constructor.
*
* @deprecated Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. Will be removed in the
* next major/minor release.
*
* @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests
* @param endpoint Kinesis endpoint
* @param serviceName service name
* @param regionId region id
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/
@Deprecated
public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider,
String endpoint,
String serviceName,
String regionId,
long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts,
long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
this(streamName,
credentialProvider,
buildClientSettingEndpoint(credentialProvider, endpoint, serviceName, regionId),
describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
LOG.debug("KinesisProxy has created a kinesisClient");
}
/**
* Public constructor.
*
* @deprecated Deprecating constructor, this constructor doesn't use AWS best practices, moving forward please use
* {@link #KinesisProxy(KinesisClientLibConfiguration, AmazonKinesis)} or
* {@link #KinesisProxy(String, AmazonKinesis, long, int, long, int)} to create the object. Will be removed in the
* next major/minor release.
*
* @param streamName Data records will be fetched from this stream * @param streamName Data records will be fetched from this stream
* @param credentialProvider Provides credentials for signing Kinesis requests * @param credentialProvider Provides credentials for signing Kinesis requests
* @param kinesisClient Kinesis client (used to fetch data from Kinesis) * @param kinesisClient Kinesis client (used to fetch data from Kinesis)
* @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds * @param describeStreamBackoffTimeInMillis Backoff time for DescribeStream calls in milliseconds
* @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls * @param maxDescribeStreamRetryAttempts Number of retry attempts for DescribeStream calls
*/ */
@Deprecated
public KinesisProxy(final String streamName, public KinesisProxy(final String streamName,
AWSCredentialsProvider credentialProvider, AWSCredentialsProvider credentialProvider,
AmazonKinesis kinesisClient, AmazonKinesis kinesisClient,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
this.streamName = streamName; long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
this(streamName, kinesisClient, describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis, maxListShardsRetryAttempts);
this.credentialsProvider = credentialProvider; this.credentialsProvider = credentialProvider;
LOG.debug("KinesisProxy( " + streamName + ")");
}
/**
* Public constructor.
* @param config
*/
public KinesisProxy(final KinesisClientLibConfiguration config, final AmazonKinesis client) {
this(config.getStreamName(),
client,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS,
DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
config.getListShardsBackoffTimeInMillis(),
config.getMaxListShardsRetryAttempts());
this.credentialsProvider = config.getKinesisCredentialsProvider();
}
public KinesisProxy(final String streamName,
final AmazonKinesis client,
final long describeStreamBackoffTimeInMillis,
final int maxDescribeStreamRetryAttempts,
final long listShardsBackoffTimeInMillis,
final int maxListShardsRetryAttempts) {
this.streamName = streamName;
this.client = client;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
this.client = kinesisClient; this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
LOG.debug("KinesisProxy( " + streamName + ")"); try {
if (Class.forName("com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient")
.isAssignableFrom(client.getClass())) {
isKinesisClient = false;
LOG.debug("Client is DynamoDb client, will use DescribeStream.");
}
} catch (ClassNotFoundException e) {
LOG.debug("Client is Kinesis Client, using ListShards instead of DescribeStream.");
}
} }
/** /**
@ -164,6 +242,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
@Deprecated
public DescribeStreamResult getStreamInfo(String startShardId) public DescribeStreamResult getStreamInfo(String startShardId)
throws ResourceNotFoundException, LimitExceededException { throws ResourceNotFoundException, LimitExceededException {
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
@ -207,6 +286,47 @@ public class KinesisProxy implements IKinesisProxyExtended {
return null; return null;
} }
} }
private ListShardsResult listShards(final String nextToken) {
final ListShardsRequest request = new ListShardsRequest();
request.setRequestCredentials(credentialsProvider.getCredentials());
if (StringUtils.isEmpty(nextToken)) {
request.setStreamName(streamName);
} else {
request.setNextToken(nextToken);
}
ListShardsResult result = null;
LimitExceededException lastException = null;
int remainingRetries = this.maxListShardsRetryAttempts;
while (result == null) {
try {
result = client.listShards(request);
} catch (LimitExceededException e) {
LOG.info("Got LimitExceededException when listing shards " + streamName + ". Backing off for "
+ this.listShardsBackoffTimeInMillis + " millis.");
try {
Thread.sleep(this.listShardsBackoffTimeInMillis);
} catch (InterruptedException ie) {
LOG.debug("Stream " + streamName + " : Sleep was interrupted ", ie);
}
lastException = e;
} catch (ResourceInUseException e) {
LOG.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or"
+ " Updating)");
return null;
}
remainingRetries--;
if (remainingRetries <= 0 && result == null) {
if (lastException != null) {
throw lastException;
}
throw new IllegalStateException("Received null from ListShards call.");
}
}
return result;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
@ -233,27 +353,47 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
@Override @Override
public synchronized List<Shard> getShardList() { public synchronized List<Shard> getShardList() {
DescribeStreamResult response;
if (shardIterationState == null) { if (shardIterationState == null) {
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
} }
if (isKinesisClient) {
ListShardsResult result;
String nextToken = null;
do {
result = listShards(nextToken);
if (result == null) {
/*
* If listShards ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/
return null;
} else {
shardIterationState.update(result.getShards());
nextToken = result.getNextToken();
}
} while (StringUtils.isNotEmpty(result.getNextToken()));
} else {
DescribeStreamResult response;
do { do {
response = getStreamInfo(shardIterationState.getLastShardId()); response = getStreamInfo(shardIterationState.getLastShardId());
if (response == null) { if (response == null) {
/* /*
* If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not * If getStreamInfo ever returns null, we should bail and return null. This indicates the stream is not
* in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream. * in ACTIVE or UPDATING state and we may not have accurate/consistent information about the stream.
*/ */
return null; return null;
} else { } else {
shardIterationState.update(response.getStreamDescription().getShards()); shardIterationState.update(response.getStreamDescription().getShards());
} }
} while (response.getStreamDescription().isHasMoreShards()); } while (response.getStreamDescription().isHasMoreShards());
}
this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
return listOfShardsSinceLastGet.get(); return listOfShardsSinceLastGet.get();
} }

View file

@ -18,10 +18,15 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
/** /**
* Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis). * Factory used for instantiating KinesisProxy objects (to fetch data from Kinesis).
*
* @deprecated Will be removed since proxy is created only once, we don't need a factory. There is no replacement for
* this class. Will be removed in the next major/minor release.
*/ */
@Deprecated
public class KinesisProxyFactory implements IKinesisProxyFactory { public class KinesisProxyFactory implements IKinesisProxyFactory {
private final AWSCredentialsProvider credentialProvider; private final AWSCredentialsProvider credentialProvider;
@ -32,6 +37,8 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
private final AmazonKinesis kinesisClient; private final AmazonKinesis kinesisClient;
private final long describeStreamBackoffTimeInMillis; private final long describeStreamBackoffTimeInMillis;
private final int maxDescribeStreamRetryAttempts; private final int maxDescribeStreamRetryAttempts;
private final long listShardsBackoffTimeInMillis;
private final int maxListShardsRetryAttempts;
/** /**
* Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint. * Constructor for creating a KinesisProxy factory, using the specified credentials provider and endpoint.
@ -41,12 +48,14 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
*/ */
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) { public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, String endpoint) {
this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId, this(credentialProvider, new ClientConfiguration(), endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
* Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis. * Constructor for KinesisProxy factory using the client configuration to use when interacting with Kinesis.
* *
* @param credentialProvider credentials provider used to sign requests * @param credentialProvider credentials provider used to sign requests
* @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient * @param clientConfig Client Configuration used when instantiating an AmazonKinesisClient
* @param endpoint Amazon Kinesis endpoint to use * @param endpoint Amazon Kinesis endpoint to use
@ -55,7 +64,9 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
ClientConfiguration clientConfig, ClientConfiguration clientConfig,
String endpoint) { String endpoint) {
this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId, this(credentialProvider, clientConfig, endpoint, defaultServiceName, defaultRegionId,
DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
@ -65,7 +76,9 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
* @param client AmazonKinesisClient used to fetch data from Kinesis * @param client AmazonKinesisClient used to fetch data from Kinesis
*/ */
public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) { public KinesisProxyFactory(AWSCredentialsProvider credentialProvider, AmazonKinesis client) {
this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES); this(credentialProvider, client, DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS, DEFAULT_DESCRIBE_STREAM_RETRY_TIMES,
KinesisClientLibConfiguration.DEFAULT_LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
KinesisClientLibConfiguration.DEFAULT_MAX_LIST_SHARDS_RETRY_ATTEMPTS);
} }
/** /**
@ -85,13 +98,18 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
String serviceName, String serviceName,
String regionId, String regionId,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
this(credentialProvider, buildClientSettingEndpoint(credentialProvider, this(credentialProvider, buildClientSettingEndpoint(credentialProvider,
clientConfig, clientConfig,
endpoint, endpoint,
serviceName, serviceName,
regionId), regionId),
describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts); describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
} }
@ -106,14 +124,18 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
KinesisProxyFactory(AWSCredentialsProvider credentialProvider, KinesisProxyFactory(AWSCredentialsProvider credentialProvider,
AmazonKinesis client, AmazonKinesis client,
long describeStreamBackoffTimeInMillis, long describeStreamBackoffTimeInMillis,
int maxDescribeStreamRetryAttempts) { int maxDescribeStreamRetryAttempts,
long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts) {
super(); super();
this.kinesisClient = client; this.kinesisClient = client;
this.credentialProvider = credentialProvider; this.credentialProvider = credentialProvider;
this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis; this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts; this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@ -123,15 +145,16 @@ public class KinesisProxyFactory implements IKinesisProxyFactory {
credentialProvider, credentialProvider,
kinesisClient, kinesisClient,
describeStreamBackoffTimeInMillis, describeStreamBackoffTimeInMillis,
maxDescribeStreamRetryAttempts); maxDescribeStreamRetryAttempts,
listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts);
} }
private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider,
ClientConfiguration clientConfig, ClientConfiguration clientConfig,
String endpoint, String endpoint,
String serviceName, String serviceName,
String regionId) { String regionId) {
AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig); AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider, clientConfig);
client.setEndpoint(endpoint); client.setEndpoint(endpoint);
client.setSignerRegionOverride(regionId); client.setSignerRegionOverride(regionId);

View file

@ -0,0 +1,26 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.dynamodbv2.streamsadapter;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
/**
* This class is only used for testing purposes, to make sure that the correct calls are made while using DynamoDB
* streams.
*/
public class AmazonDynamoDBStreamsAdapterClient extends AmazonKinesisClient {
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.dynamodbv2.streamsadapter;
/**
* This class is only used for testing purposes, to make sure that the correct calls are made while using DynamoDB
* streams.
*/
public class AmazonDynamoDBStreamsAdapterClientChild extends AmazonDynamoDBStreamsAdapterClient {
}

View file

@ -90,6 +90,7 @@ public class ShardSyncTaskIntegrationTest {
new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest", new KinesisClientLeaseManager("ShardSyncTaskIntegrationTest",
new AmazonDynamoDBClient(credentialsProvider), new AmazonDynamoDBClient(credentialsProvider),
useConsistentReads); useConsistentReads);
kinesisProxy = kinesisProxy =
new KinesisProxy(STREAM_NAME, new KinesisProxy(STREAM_NAME,
new DefaultAWSCredentialsProviderChain(), new DefaultAWSCredentialsProviderChain(),
@ -106,7 +107,6 @@ public class ShardSyncTaskIntegrationTest {
/** /**
* Test method for call(). * Test method for call().
* *
* @throws CapacityExceededException
* @throws DependencyException * @throws DependencyException
* @throws InvalidStateException * @throws InvalidStateException
* @throws ProvisionedThroughputException * @throws ProvisionedThroughputException

View file

@ -33,7 +33,6 @@ public class KinesisLocalFileProxyFactory implements IKinesisProxyFactory {
private static final String DEFAULT_TEST_PROXY_FILE = "defaultKinesisProxyLocalFile"; private static final String DEFAULT_TEST_PROXY_FILE = "defaultKinesisProxyLocalFile";
private IKinesisProxy testKinesisProxy; private IKinesisProxy testKinesisProxy;
/** /**
* @param fileName File to be used for stream data. * @param fileName File to be used for stream data.
@ -60,5 +59,4 @@ public class KinesisLocalFileProxyFactory implements IKinesisProxyFactory {
public IKinesisProxy getProxy(String streamARN) { public IKinesisProxy getProxy(String streamARN) {
return testKinesisProxy; return testKinesisProxy;
} }
} }

View file

@ -19,9 +19,12 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -29,13 +32,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClientChild;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import lombok.AllArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.hamcrest.Description; import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Before; import org.junit.Before;
@ -47,7 +59,6 @@ import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
@ -61,11 +72,18 @@ import com.amazonaws.services.kinesis.model.StreamStatus;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class KinesisProxyTest { public class KinesisProxyTest {
private static final String TEST_STRING = "TestString"; private static final String TEST_STRING = "TestString";
private static final long BACKOFF_TIME = 10L; private static final long DESCRIBE_STREAM_BACKOFF_TIME = 10L;
private static final int RETRY_TIMES = 3; private static final long LIST_SHARDS_BACKOFF_TIME = 10L;
private static final int DESCRIBE_STREAM_RETRY_TIMES = 3;
private static final int LIST_SHARDS_RETRY_TIMES = 3;
private static final String NEXT_TOKEN = "NextToken";
@Mock @Mock
private AmazonKinesisClient mockClient; private AmazonKinesis mockClient;
@Mock
private AmazonDynamoDBStreamsAdapterClient mockDDBStreamClient;
@Mock
private AmazonDynamoDBStreamsAdapterClientChild mockDDBChildClient;
@Mock @Mock
private AWSCredentialsProvider mockCredentialsProvider; private AWSCredentialsProvider mockCredentialsProvider;
@Mock @Mock
@ -76,8 +94,12 @@ public class KinesisProxyTest {
private StreamDescription streamDescription; private StreamDescription streamDescription;
@Mock @Mock
private Shard shard; private Shard shard;
@Mock
private KinesisClientLibConfiguration config;
private KinesisProxy proxy; private KinesisProxy proxy;
private KinesisProxy ddbProxy;
private KinesisProxy ddbChildProxy;
// Test shards for verifying. // Test shards for verifying.
private Set<String> shardIdSet; private Set<String> shardIdSet;
@ -85,19 +107,24 @@ public class KinesisProxyTest {
@Before @Before
public void setUpTest() { public void setUpTest() {
// Set up kinesis proxy // Set up kinesis ddbProxy
proxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockClient, BACKOFF_TIME, RETRY_TIMES); when(config.getStreamName()).thenReturn(TEST_STRING);
when(mockCredentialsProvider.getCredentials()).thenReturn(null); when(config.getListShardsBackoffTimeInMillis()).thenReturn(LIST_SHARDS_BACKOFF_TIME);
when(config.getMaxListShardsRetryAttempts()).thenReturn(LIST_SHARDS_RETRY_TIMES);
when(config.getKinesisCredentialsProvider()).thenReturn(mockCredentialsProvider);
proxy = new KinesisProxy(config, mockClient);
ddbProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBStreamClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
ddbChildProxy = new KinesisProxy(TEST_STRING, mockCredentialsProvider, mockDDBChildClient,
DESCRIBE_STREAM_BACKOFF_TIME, DESCRIBE_STREAM_RETRY_TIMES, LIST_SHARDS_BACKOFF_TIME,
LIST_SHARDS_RETRY_TIMES);
// Set up test shards // Set up test shards
shardIdSet = new HashSet<>(); List<String> shardIds = Arrays.asList("shard-1", "shard-2", "shard-3", "shard-4");
shards = new ArrayList<>(); shardIdSet = new HashSet<>(shardIds);
String[] shardIds = new String[] { "shard-1", "shard-2", "shard-3", "shard-4" }; shards = shardIds.stream().map(shardId -> new Shard().withShardId(shardId)).collect(Collectors.toList());
for (String shardId : shardIds) {
Shard shard = new Shard();
shard.setShardId(shardId);
shards.add(shard);
shardIdSet.add(shardId);
}
} }
@Test @Test
@ -107,11 +134,11 @@ public class KinesisProxyTest {
// Second call describeStream returning response with rest shards. // Second call describeStream returning response with rest shards.
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true); DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false); DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockClient).describeStream(argThat(new IsRequestWithStartShardId(null))); doReturn(responseWithMoreData).when(mockDDBStreamClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockClient) doReturn(responseFinal).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId()))); .describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
Set<String> resultShardIdSets = proxy.getAllShardIds(); Set<String> resultShardIdSets = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets)); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
} }
@ -121,53 +148,56 @@ public class KinesisProxyTest {
// First call describeStream throwing LimitExceededException; // First call describeStream throwing LimitExceededException;
// Second call describeStream returning shards list. // Second call describeStream returning shards list.
DescribeStreamResult response = createGetStreamInfoResponse(shards, false); DescribeStreamResult response = createGetStreamInfoResponse(shards, false);
doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockClient) doThrow(new LimitExceededException("Test Exception")).doReturn(response).when(mockDDBStreamClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(null))); .describeStream(argThat(new OldIsRequestWithStartShardId(null)));
Set<String> resultShardIdSet = proxy.getAllShardIds(); Set<String> resultShardIdSet = ddbProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet)); assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSet));
} }
@Test @Test
public void testValidShardIteratorType() { public void testValidShardIteratorType() {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(); String expectedShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString();
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
} }
@Test @Test
public void testInvalidShardIteratorIsntChanged() { public void testInvalidShardIteratorIsntChanged() {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(shardIteratorResult);
String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString(); String expectedShardIteratorType = ShardIteratorType.AT_TIMESTAMP.toString();
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType))))); .and(hasProperty("shardIteratorType", equalTo(expectedShardIteratorType)))));
} }
@Test(expected = AmazonServiceException.class) @Test(expected = AmazonServiceException.class)
public void testNullShardIteratorType() { public void testNullShardIteratorType() throws Exception {
when(mockClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null")); when(mockDDBStreamClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(new AmazonServiceException("expected null"));
String expectedShardIteratorType = null; String expectedShardIteratorType = null;
proxy.getIterator("Shard-001", expectedShardIteratorType, "1234"); ddbProxy.getIterator("Shard-001", expectedShardIteratorType, "1234");
verify(mockClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class)) verify(mockDDBStreamClient).getShardIterator(argThat(both(isA(GetShardIteratorRequest.class))
.and(hasProperty("shardIteratorType", nullValue(String.class))))); .and(hasProperty("shardIteratorType", nullValue(String.class)))));
} }
@Test(expected = AmazonServiceException.class) @Test(expected = AmazonServiceException.class)
public void testGetStreamInfoFails() throws Exception { public void testGetStreamInfoFails() {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test")); when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new AmazonServiceException("Test"));
proxy.getShardList(); try {
verify(mockClient).describeStream(any(DescribeStreamRequest.class)); ddbProxy.getShardList();
} finally {
verify(mockDDBStreamClient).describeStream(any(DescribeStreamRequest.class));
}
} }
@Test @Test
public void testGetStreamInfoThrottledOnce() throws Exception { public void testGetStreamInfoThrottledOnce() throws Exception {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")) when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"))
.thenReturn(describeStreamResult); .thenReturn(describeStreamResult);
when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription); when(describeStreamResult.getStreamDescription()).thenReturn(streamDescription);
when(streamDescription.getHasMoreShards()).thenReturn(false); when(streamDescription.getHasMoreShards()).thenReturn(false);
@ -175,11 +205,11 @@ public class KinesisProxyTest {
List<Shard> expectedShards = Collections.singletonList(shard); List<Shard> expectedShards = Collections.singletonList(shard);
when(streamDescription.getShards()).thenReturn(expectedShards); when(streamDescription.getShards()).thenReturn(expectedShards);
List<Shard> actualShards = proxy.getShardList(); List<Shard> actualShards = ddbProxy.getShardList();
assertThat(actualShards, equalTo(expectedShards)); assertThat(actualShards, equalTo(expectedShards));
verify(mockClient, times(2)).describeStream(any(DescribeStreamRequest.class)); verify(mockDDBStreamClient, times(2)).describeStream(any(DescribeStreamRequest.class));
verify(describeStreamResult, times(3)).getStreamDescription(); verify(describeStreamResult, times(3)).getStreamDescription();
verify(streamDescription).getStreamStatus(); verify(streamDescription).getStreamStatus();
verify(streamDescription).isHasMoreShards(); verify(streamDescription).isHasMoreShards();
@ -187,9 +217,9 @@ public class KinesisProxyTest {
@Test(expected = LimitExceededException.class) @Test(expected = LimitExceededException.class)
public void testGetStreamInfoThrottledAll() throws Exception { public void testGetStreamInfoThrottledAll() throws Exception {
when(mockClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test")); when(mockDDBStreamClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(new LimitExceededException("Test"));
proxy.getShardList(); ddbProxy.getShardList();
} }
@Test @Test
@ -213,32 +243,99 @@ public class KinesisProxyTest {
when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3); when(streamDescription.getShards()).thenReturn(shardList1).thenReturn(shardList2).thenReturn(shardList3);
when(streamDescription.isHasMoreShards()).thenReturn(true, true, false); when(streamDescription.isHasMoreShards()).thenReturn(true, true, false);
when(mockClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult); when(mockDDBStreamClient.describeStream(argThat(describeWithoutShardId()))).thenReturn(describeStreamResult);
when(mockClient.describeStream(argThat(describeWithShardId(shardId1)))) when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId1))))
.thenThrow(new LimitExceededException("1"), new LimitExceededException("2"), .thenThrow(new LimitExceededException("1"), new LimitExceededException("2"),
new LimitExceededException("3")) new LimitExceededException("3"))
.thenReturn(describeStreamResult); .thenReturn(describeStreamResult);
when(mockClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult); when(mockDDBStreamClient.describeStream(argThat(describeWithShardId(shardId2)))).thenReturn(describeStreamResult);
boolean limitExceeded = false; boolean limitExceeded = false;
try { try {
proxy.getShardList(); ddbProxy.getShardList();
} catch (LimitExceededException le) { } catch (LimitExceededException le) {
limitExceeded = true; limitExceeded = true;
} }
assertThat(limitExceeded, equalTo(true)); assertThat(limitExceeded, equalTo(true));
List<Shard> actualShards = proxy.getShardList(); List<Shard> actualShards = ddbProxy.getShardList();
List<Shard> expectedShards = Arrays.asList(shard1, shard2, shard3); List<Shard> expectedShards = Arrays.asList(shard1, shard2, shard3);
assertThat(actualShards, equalTo(expectedShards)); assertThat(actualShards, equalTo(expectedShards));
verify(mockClient).describeStream(argThat(describeWithoutShardId())); verify(mockDDBStreamClient).describeStream(argThat(describeWithoutShardId()));
verify(mockClient, times(4)).describeStream(argThat(describeWithShardId(shardId1))); verify(mockDDBStreamClient, times(4)).describeStream(argThat(describeWithShardId(shardId1)));
verify(mockClient).describeStream(argThat(describeWithShardId(shardId2))); verify(mockDDBStreamClient).describeStream(argThat(describeWithShardId(shardId2)));
} }
@Test
public void testListShardsWithMoreDataAvailable() {
ListShardsResult responseWithMoreData = new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
ListShardsResult responseFinal = new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenReturn(responseWithMoreData);
when(mockClient.listShards(argThat(listShardsNextToken(NEXT_TOKEN)))).thenReturn(responseFinal);
Set<String> resultShardIdSets = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSets);
}
@Test
public void testListShardsWithLimiteExceededException() {
ListShardsResult result = new ListShardsResult().withShards(shards);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class).thenReturn(result);
Set <String> resultShardIdSet = proxy.getAllShardIds();
assertEquals(shardIdSet, resultShardIdSet);
}
@Test(expected = AmazonServiceException.class)
public void testListShardsFails() {
when(mockClient.listShards(any(ListShardsRequest.class))).thenThrow(AmazonServiceException.class);
try {
proxy.getShardList();
} finally {
verify(mockClient).listShards(any(ListShardsRequest.class));
}
}
@Test
public void testListShardsThrottledOnce() {
List<Shard> expected = Collections.singletonList(shard);
ListShardsResult result = new ListShardsResult().withShards(expected);
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class)
.thenReturn(result);
List<Shard> actualShards = proxy.getShardList();
assertEquals(expected, actualShards);
verify(mockClient, times(2)).listShards(argThat(initialListShardsRequestMatcher()));
}
@Test(expected = LimitExceededException.class)
public void testListShardsThrottledAll() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(LimitExceededException.class);
proxy.getShardList();
}
@Test
public void testStreamNotInCorrectStatus() {
when(mockClient.listShards(argThat(initialListShardsRequestMatcher()))).thenThrow(ResourceInUseException.class);
assertNull(proxy.getShardList());
}
@Test
public void testGetShardListWithDDBChildClient() {
DescribeStreamResult responseWithMoreData = createGetStreamInfoResponse(shards.subList(0, 2), true);
DescribeStreamResult responseFinal = createGetStreamInfoResponse(shards.subList(2, shards.size()), false);
doReturn(responseWithMoreData).when(mockDDBChildClient).describeStream(argThat(new IsRequestWithStartShardId(null)));
doReturn(responseFinal).when(mockDDBChildClient)
.describeStream(argThat(new OldIsRequestWithStartShardId(shards.get(1).getShardId())));
Set<String> resultShardIdSets = ddbChildProxy.getAllShardIds();
assertThat("Result set should equal to Test set", shardIdSet, equalTo(resultShardIdSets));
}
private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) { private DescribeStreamResult createGetStreamInfoResponse(List<Shard> shards1, boolean isHasMoreShards) {
// Create stream description // Create stream description
@ -261,8 +358,8 @@ public class KinesisProxyTest {
return new IsRequestWithStartShardId(shardId); return new IsRequestWithStartShardId(shardId);
} }
// Matcher for testing describe stream request with specific start shard ID.
private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> { private static class IsRequestWithStartShardId extends TypeSafeDiagnosingMatcher<DescribeStreamRequest> {
private final String shardId; private final String shardId;
public IsRequestWithStartShardId(String shardId) { public IsRequestWithStartShardId(String shardId) {
@ -291,6 +388,7 @@ public class KinesisProxyTest {
description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId); description.appendText("A DescribeStreamRequest with a starting shard if of ").appendValue(shardId);
} }
} }
// Matcher for testing describe stream request with specific start shard ID.
private static class OldIsRequestWithStartShardId extends ArgumentMatcher<DescribeStreamRequest> { private static class OldIsRequestWithStartShardId extends ArgumentMatcher<DescribeStreamRequest> {
private final String shardId; private final String shardId;
@ -309,5 +407,58 @@ public class KinesisProxyTest {
return startShardId.equals(shardId); return startShardId.equals(shardId);
} }
} }
private static ListShardsRequestMatcher initialListShardsRequestMatcher() {
return new ListShardsRequestMatcher(null, null);
}
private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) {
return new ListShardsRequestMatcher(null, nextToken);
}
@AllArgsConstructor
private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> {
private final String shardId;
private final String nextToken;
@Override
protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) {
if (shardId == null) {
if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected ExclusiveStartShardId to be null, but was ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
} else {
if (!shardId.equals(listShardsRequest.getExclusiveStartShardId())) {
description.appendText("Expected shardId: ").appendValue(shardId)
.appendText(" doesn't match actual shardId: ")
.appendValue(listShardsRequest.getExclusiveStartShardId());
return false;
}
}
if (StringUtils.isNotEmpty(listShardsRequest.getNextToken())) {
if (StringUtils.isNotEmpty(listShardsRequest.getStreamName()) || StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
return false;
}
if (!listShardsRequest.getNextToken().equals(nextToken)) {
description.appendText("Found nextToken: ").appendValue(listShardsRequest.getNextToken())
.appendText(" when it was supposed to be null.");
return false;
}
} else {
return nextToken == null;
}
return true;
}
@Override
public void describeTo(final Description description) {
description.appendText("A ListShardsRequest with a shardId: ").appendValue(shardId)
.appendText(" and empty nextToken");
}
}
} }