Compare commits

...

19 commits

Author SHA1 Message Date
Abhi Gupta
37ae2f86be
Overriding the DataFetcher to return a custom GetRecordsResponseAdapter so that customers can have custom logic to send data to KinesisClientRecord (#1479) 2025-05-29 23:27:03 -07:00
lucienlu-aws
1ce6123a78
Prepare for release 3.0.3 (#1473) 2025-05-08 14:17:58 -07:00
chenylee-aws
ca0b0c15e1
Remove unused synchronized keyword (#1472) 2025-05-07 10:47:15 -07:00
ehasah-aws
63911b53a0
Make LAM run at configurable interval (#1464)
* Make LAM run at configurable interval, tie LeaseDiscoverer with LAM run and tie load balancing with leaseDuration

* remove unwanted SampleApp class

* remove unwanted RecordProcessor and added new parameter class

* Updated comment removed one variable for consistency

* Updated test logic to use Supplier to provide time

* updated logic to count based variance balancing

* Changed variance based balancing to 12

* Changed variance based balancing to 3

* Change logic to balance tied to lease duration

* Change logic to balance tied to LAM run

* Code for backward compatibility

* Code for backward compatibility check

* Code for backward compatibility check

* Best practice to deprecate old constructor

* Best practice to deprecate old constructor

* Best practice to deprecate old constructor

* removed backward compatibility code/constructors

* Formating and remove unused variable

* added formating to avoid build failure
2025-04-17 10:16:12 -07:00
Minu Hong
d7dd21beca
KCL 3.x documentation update (#1465)
* Create kcl_3x_deep-dive.md

* Update kcl_3x_deep-dive.md

* Update kcl-configurations.md

 - Add two KCL configurations - WorkerMetricsTableConfig, CoordinatorStateTableConfig - to let users know how to set the custom table name.
 - Update how to set custom names for KCL metadata tables created in DynamoDB
2025-04-16 10:08:52 -07:00
Abhi Gupta
897d993782
Extending ShardConsumer class constructor to have ConsumerTaskFactory as a param (#1463) 2025-04-14 12:57:58 -07:00
chenylee-aws
6990fc513f
Update Java doc for internal Api annotation (#1466) 2025-04-14 09:32:16 -07:00
skyero-aws
133374706c
Dependabot auto merge addition in github workflows (#1459)
Dependabot auto-merge feature. Auto-merge triggers for dependabot depencency pull requests that are patches and have a cvss level greater than zero.
2025-04-01 11:02:17 -07:00
Minu Hong
edd7d9b1e5
Update README.md (#1455)
* Update README.md

Update KCL main readme page to add end-of-support notice for KCL 1.x. Also applied some formatting changes in the KCL versions section.

* Update README.md

Update the readme

* Update README.md
2025-03-20 17:37:50 -07:00
lucienlu-aws
458ed653bc
Re-add SNAPSHOT to version (#1453) 2025-03-14 11:12:09 -07:00
vincentvilo-aws
9daed2dda9
Update version number in README for 2.x (#1450) 2025-03-13 10:25:41 -07:00
lucienlu-aws
5263b4227c
Prepare for release 3.0.2 (#1447) 2025-03-12 10:22:41 -07:00
ehasah-aws
7b5ebaeb93
Calculate scan segment by table size (#1443)
* Fixes DDB usage spike issue

* Removed un-necessary exception handling

* made max total segment size 30

* Cached total scan segment

* Reuse listLeasesParallely api to dynamically calculate total segment

* Added unit tests and made getParallelScanTotalSegments synchronous

* Simplified getParallelScanTotalSegments method

* Fall back to previously calculated totalScan

* fixed formating
2025-03-12 10:10:19 -07:00
dependabot[bot]
e856e7c95e
Bump io.netty:netty-handler in /amazon-kinesis-client (#1439)
Bumps [io.netty:netty-handler](https://github.com/netty/netty) from 4.1.108.Final to 4.1.118.Final.
- [Commits](https://github.com/netty/netty/compare/netty-4.1.108.Final...netty-4.1.118.Final)

---
updated-dependencies:
- dependency-name: io.netty:netty-handler
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-03-03 15:17:53 -08:00
Abhi Gupta
c9563ab585
Having a way to override the consumerTaskFactory as part of LeaseManagementConfig (#1441) 2025-02-23 22:17:43 -08:00
Abhi Gupta
8deebe4bda
Adding functionality to override the vanilla KCL tasks (#1440) 2025-02-20 20:26:21 -08:00
ivanlkc
68a7a9bf53
Suppress LeaseAssignmentManager excessive WARN logs when there is no issue (#1437)
This commit fixes https://github.com/awslabs/amazon-kinesis-client/issues/1407. The WARN level log statements should only be executed when a real problem had been detected.
2025-02-06 17:20:19 -08:00
Minu Hong
ae9a433ebd
Update README.md (#1434)
To update the notice related to the issue with AWS Java SDK 2.27.x versions.
2025-01-27 16:42:54 -08:00
Minu Hong
854d99da52
Update README.md (#1428)
Update the version number in the dependency for KCL 2.x
2025-01-03 15:11:04 -08:00
46 changed files with 1186 additions and 394 deletions

View file

@ -1,7 +1,7 @@
version: 2
updates:
# branch - master
- package-ecosystem: "maven"
# branch - master
- package-ecosystem: "maven"
directory: "/"
labels:
- "dependencies"
@ -10,8 +10,8 @@ updates:
schedule:
interval: "weekly"
# branch - v2.x
- package-ecosystem: "maven"
# branch - v2.x
- package-ecosystem: "maven"
directory: "/"
labels:
- "dependencies"
@ -20,8 +20,8 @@ updates:
schedule:
interval: "weekly"
# branch - v1.x
- package-ecosystem: "maven"
# branch - v1.x
- package-ecosystem: "maven"
directory: "/"
labels:
- "dependencies"
@ -29,4 +29,3 @@ updates:
target-branch: "v1.x"
schedule:
interval: "weekly"

View file

@ -92,7 +92,7 @@ find_removed_methods() {
grep -v 'software\.amazon\.kinesis\.retrieval\.kpl\.Messages')
for class in $latest_classes
do
if (is_kinesis_client_internal_api "$class" && (is_new_minor_release || is_new_major_release)) || is_non_public_class "$class"
if is_kinesis_client_internal_api "$class" || is_non_public_class "$class"
then
continue
fi
@ -117,10 +117,6 @@ find_removed_methods() {
if [[ "$removed_methods" != "" ]]
then
REMOVED_METHODS_FLAG=$TRUE
if is_kinesis_client_internal_api "$class"
then
echo "Found removed methods in class with @KinesisClientInternalApi annotation. To resolve these issues, upgrade the current minor version or address these changes."
fi
echo "$class does not have method(s):"
echo "$removed_methods"
fi

View file

@ -7,20 +7,25 @@
# documentation.
name: Java CI with Maven
on:
push:
branches:
- "master"
- "v2.x"
- "v1.x"
pull_request:
branches:
- "master"
- "v2.x"
- "v1.x"
permissions:
contents: write
pull-requests: write
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
@ -30,11 +35,8 @@ jobs:
distribution: 'corretto'
- name: Build with Maven
run: mvn -B package --file pom.xml -DskipITs
backwards-compatible-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
@ -44,3 +46,20 @@ jobs:
distribution: 'corretto'
- name: Check backwards compatibility of changes
run: .github/scripts/backwards_compatibility_check.sh
auto-merge:
needs: [build]
runs-on: ubuntu-latest
if: github.event.pull_request.user.login == 'dependabot[bot]'
steps:
- name: Dependabot metadata
id: metadata
uses: dependabot/fetch-metadata@v2
with:
alert-lookup: true
github-token: "${{ secrets.GITHUB_TOKEN }}"
- name: Enable auto-merge for Dependabot PRs
if: steps.metadata.outputs.update-type == 'version-update:semver-patch' && steps.metadata.outputs.cvss > 0
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GH_TOKEN: ${{secrets.GITHUB_TOKEN}}

View file

@ -5,6 +5,20 @@ For **1.x** release notes, please see [v1.x/CHANGELOG.md](https://github.com/aws
For **2.x** release notes, please see [v2.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v2.x/CHANGELOG.md)
---
### Release 3.0.3 (May 7, 2025)
* [#1464](https://github.com/awslabs/amazon-kinesis-client/pull/1464) Add config for LeaseAssignmentManager frequency and improve assignment time of newly created leases
* [#1463](https://github.com/awslabs/amazon-kinesis-client/pull/1463) Extend ShardConsumer constructor to have ConsumerTaskFactory as a parameter to support [DynamoDB Streams Kinesis Adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) compatibility
* [#1472](https://github.com/awslabs/amazon-kinesis-client/pull/1472) Remove unused synchronized keyword
### Release 3.0.2 (March 12, 2025)
* [#1443](https://github.com/awslabs/amazon-kinesis-client/pull/1443) Reduce DynamoDB IOPS for smaller stream and worker count applications
* The below two PRs are intended to support [DynamoDB Streams Kinesis Adapter](https://github.com/awslabs/dynamodb-streams-kinesis-adapter) compatibility
* [#1441](https://github.com/awslabs/amazon-kinesis-client/pull/1441) Make consumerTaskFactory overridable by customers
* [#1440](https://github.com/awslabs/amazon-kinesis-client/pull/1440) Make ShutdownTask, ProcessTask, InitializeTask, BlockOnParentTask, and ShutdownNotificationTask overridable by customers
* [#1437](https://github.com/awslabs/amazon-kinesis-client/pull/1437) Suppress LeaseAssignmentManager excessive WARN logs when there is no issue
* [#1439](https://github.com/awslabs/amazon-kinesis-client/pull/1439) Upgrade io.netty:netty-handler from 4.1.108.Final to 4.1.118.Final
* [#1400](https://github.com/awslabs/amazon-kinesis-client/pull/1400) Upgrade com.fasterxml.jackson.core:jackson-databind from 2.10.1 to 2.12.7.1
### Release 3.0.1 (November 14, 2024)
* [#1401](https://github.com/awslabs/amazon-kinesis-client/pull/1401) Fixed the lease graceful handoff behavior in the multi-stream processing mode
* [#1398](https://github.com/awslabs/amazon-kinesis-client/pull/1398) Addressed several KCL 3.0 related issues raised via GitHub

View file

@ -1,6 +1,15 @@
# Amazon Kinesis Client Library for Java
[![Build Status](https://travis-ci.org/awslabs/amazon-kinesis-client.svg?branch=master)](https://travis-ci.org/awslabs/amazon-kinesis-client)
> [!IMPORTANT]
> ### Amazon Kinesis Client Library (KCL) 1.x will reach end-of-support on January 30, 2026
> Amazon Kinesis Client Library (KCL) 1.x will reach end-of-support on January 30, 2026. Accordingly, these versions will enter maintenance mode on April 17, 2025. During maintenance mode, AWS will provide updates only for critical bug fixes and security issues. Major versions in maintenance mode will not receive updates for new features or feature enhancements. If youre using KCL 1.x, we recommend migrating to the latest versions. When migrating from KCL 1.x to 3.x, you will need to update interfaces and security credential providers in your application. For details about the end-of-support notice and required actions, see the following links:
> * [AWS Blog: Announcing end-of-support for Amazon Kinesis Client Library 1.x and Amazon Kinesis Producer Library 0.x effective January 30, 2026](https://aws.amazon.com/blogs/big-data/announcing-end-of-support-for-amazon-kinesis-client-library-1-x-and-amazon-kinesis-producer-library-0-x-effective-january-30-2026/)
> * [Kinesis documentation: KCL version lifecycle policy](https://docs.aws.amazon.com/streams/latest/dev/kcl-version-lifecycle-policy.html)
> * [Kinesis documentation: Migrating from KCL 1.x to KCL 3.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-1-3.html)
## Introduction
The **Amazon Kinesis Client Library (KCL) for Java** enables Java developers to easily consume and process data from [Amazon Kinesis Data Streams][kinesis].
* [Kinesis Data Streams Product Page][kinesis]
@ -55,12 +64,20 @@ To make it easier for developers to write record processors in other languages,
## Using the KCL
The recommended way to use the KCL for Java is to consume it from Maven.
## KCL versions
> [!WARNING]
> ### Do not use AWS SDK for Java versions 2.27.19 to 2.27.23 with KCL 3.x
> When using KCL 3.x with AWS SDK for Java versions 2.27.19 through 2.27.23, you may encounter the following DynamoDB exception:
> ```software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The document path provided in the update expression is invalid for update (Service: DynamoDb, Status Code: 400, Request ID: xxx)```.
> This error occurs due to [a known issue](https://github.com/aws/aws-sdk-java-v2/issues/5584) in the AWS SDK for Java that affects the DynamoDB metadata table managed by KCL 3.x. The issue was introduced in version 2.27.19 and impacts all versions up to 2.27.23. The issue has been resolved in the AWS SDK for Java version 2.27.24. For optimal performance and stability, we recommend upgrading to version 2.28.0 or later.
### Version 3.x
``` xml
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>3.0.1</version>
<version>3.0.3</version>
</dependency>
```
@ -70,10 +87,9 @@ The recommended way to use the KCL for Java is to consume it from Maven.
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>2.6.0</version>
<version>2.7.0</version>
</dependency>
```
### Version 1.x
[Version 1.x tracking branch](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x)
``` xml
@ -83,11 +99,7 @@ The recommended way to use the KCL for Java is to consume it from Maven.
<version>1.14.1</version>
</dependency>
```
> **IMPORTANT**
> We recommend using the latest KCL version for improved performance and support.
## Release Notes
### Release Notes
| KCL Version | Changelog |
| --- | --- |
@ -95,8 +107,7 @@ The recommended way to use the KCL for Java is to consume it from Maven.
| 2.x | [v2.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v2.x/CHANGELOG.md) |
| 1.x | [v1.x/CHANGELOG.md](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/CHANGELOG.md) |
## Notices
### Version recommendation
We recommend all users to migrate to the latest respective versions to avoid known issues and benefit from all improvements.
## Giving Feedback

View file

@ -21,7 +21,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View file

@ -23,7 +23,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.3</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>
@ -171,7 +171,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.108.Final</version>
<version>4.1.118.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>

View file

@ -19,7 +19,9 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Any class/method/variable marked with this annotation is subject to breaking changes between minor releases.
* Marker interface for 'internal' APIs that should not be used outside the core module.
* Breaking changes can and will be introduced to elements marked as KinesisClientInternalApi.
* Users of the KCL should not depend on any packages, types, fields, constructors, or methods with this annotation.
*/
@Retention(RetentionPolicy.CLASS)
public @interface KinesisClientInternalApi {}

View file

@ -87,6 +87,7 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerialize
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTaskFactory;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
@ -188,6 +189,7 @@ public class Scheduler implements Runnable {
private final SchemaRegistryDecoder schemaRegistryDecoder;
private final DeletedStreamListProvider deletedStreamListProvider;
private final ConsumerTaskFactory taskFactory;
@Getter(AccessLevel.NONE)
private final MigrationStateMachine migrationStateMachine;
@ -371,6 +373,7 @@ public class Scheduler implements Runnable {
this.schemaRegistryDecoder = this.retrievalConfig.glueSchemaRegistryDeserializer() == null
? null
: new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
this.taskFactory = leaseManagementConfig().consumerTaskFactory();
}
/**
@ -418,7 +421,8 @@ public class Scheduler implements Runnable {
lamThreadPool,
System::nanoTime,
leaseManagementConfig.maxLeasesForWorker(),
leaseManagementConfig.gracefulLeaseHandoffConfig()))
leaseManagementConfig.gracefulLeaseHandoffConfig(),
leaseManagementConfig.leaseAssignmentIntervalMillis()))
.adaptiveLeaderDeciderCreator(() -> new MigrationAdaptiveLeaderDecider(metricsFactory))
.deterministicLeaderDeciderCreator(() -> new DeterministicShuffleShardSyncLeaderDecider(
leaseRefresher, Executors.newSingleThreadScheduledExecutor(), 1, metricsFactory))
@ -1164,7 +1168,8 @@ public class Scheduler implements Runnable {
lifecycleConfig.logWarningForTaskAfterMillis(),
argument,
lifecycleConfig.taskExecutionListener(),
lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
lifecycleConfig.readTimeoutsToIgnoreBeforeWarning(),
leaseManagementConfig.consumerTaskFactory());
}
/**

View file

@ -80,17 +80,6 @@ public final class LeaseAssignmentManager {
*/
private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3;
/**
* Default multiplier for LAM frequency with respect to leaseDurationMillis (lease failover millis).
* If leaseDurationMillis is 10000 millis, default LAM frequency is 20000 millis.
*/
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
/**
@ -122,6 +111,7 @@ public final class LeaseAssignmentManager {
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private boolean tookOverLeadershipInThisRun = false;
private final Map<String, Lease> prevRunLeasesState = new HashMap<>();
private final long leaseAssignmentIntervalMillis;
private Future<?> managerFuture;
@ -134,10 +124,7 @@ public final class LeaseAssignmentManager {
// so reset the flag to refresh the state before processing during a restart of LAM.
tookOverLeadershipInThisRun = false;
managerFuture = executorService.scheduleWithFixedDelay(
this::performAssignment,
0L,
leaseDurationMillis * DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER,
TimeUnit.MILLISECONDS);
this::performAssignment, 0L, (int) (leaseAssignmentIntervalMillis), TimeUnit.MILLISECONDS);
log.info("Started LeaseAssignmentManager");
return;
}
@ -537,8 +524,8 @@ public final class LeaseAssignmentManager {
.filter(workerMetrics -> !workerMetrics.isValidWorkerMetric())
.map(WorkerMetricStats::getWorkerId)
.collect(Collectors.toList());
log.warn("List of workerIds with invalid entries : {}", listOfWorkerIdOfInvalidWorkerMetricsEntry);
if (!listOfWorkerIdOfInvalidWorkerMetricsEntry.isEmpty()) {
log.warn("List of workerIds with invalid entries : {}", listOfWorkerIdOfInvalidWorkerMetricsEntry);
metricsScope.addData(
"NumWorkersWithInvalidEntry",
listOfWorkerIdOfInvalidWorkerMetricsEntry.size(),
@ -567,8 +554,8 @@ public final class LeaseAssignmentManager {
final Map.Entry<List<Lease>, List<String>> leaseListResponse = leaseListFuture.join();
this.leaseList = leaseListResponse.getKey();
log.warn("Leases that failed deserialization : {}", leaseListResponse.getValue());
if (!leaseListResponse.getValue().isEmpty()) {
log.warn("Leases that failed deserialization : {}", leaseListResponse.getValue());
MetricsUtil.addCount(
metricsScope,
"LeaseDeserializationFailureCount",
@ -689,8 +676,8 @@ public final class LeaseAssignmentManager {
}
private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely(
LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR)));
return CompletableFuture.supplyAsync(() ->
loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0)));
}
private <T> T loadWithRetry(final Callable<T> loadFunction) {

View file

@ -651,8 +651,8 @@ public class HierarchicalShardSyncer {
return parentShardIds;
}
public synchronized Lease createLeaseForChildShard(
final ChildShard childShard, final StreamIdentifier streamIdentifier) throws InvalidStateException {
public Lease createLeaseForChildShard(final ChildShard childShard, final StreamIdentifier streamIdentifier)
throws InvalidStateException {
final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, streamIdentifier);
return multiStreamArgs.isMultiStreamMode()

View file

@ -45,6 +45,8 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.lifecycle.ConsumerTaskFactory;
import software.amazon.kinesis.lifecycle.KinesisConsumerTaskFactory;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.worker.metric.WorkerMetric;
@ -112,6 +114,16 @@ public class LeaseManagementConfig {
*/
private long failoverTimeMillis = 10000L;
/**
* Lease assignment interval in milliseconds - e.g. wait for this long between Lease assignment run.
*
* <p>Default value: 2 * {@link LeaseManagementConfig#failoverTimeMillis}</p>
*/
private Long leaseAssignmentIntervalMillis;
public long leaseAssignmentIntervalMillis() {
return leaseAssignmentIntervalMillis != null ? leaseAssignmentIntervalMillis : 2 * failoverTimeMillis;
}
/**
* Whether workers should take very expired leases at priority. A very expired lease is when a worker does not
* renew its lease in 3 * {@link LeaseManagementConfig#failoverTimeMillis}. Very expired leases will be taken at
@ -215,6 +227,8 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
private ConsumerTaskFactory consumerTaskFactory = new KinesisConsumerTaskFactory();
private WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig =
new WorkerUtilizationAwareAssignmentConfig();
@ -485,7 +499,8 @@ public class LeaseManagementConfig {
isMultiStreamingMode,
leaseCleanupConfig(),
workerUtilizationAwareAssignmentConfig(),
gracefulLeaseHandoffConfig);
gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis());
}
return leaseManagementFactory;
}

View file

@ -154,8 +154,9 @@ public interface LeaseRefresher {
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
* that failed deserialize separately.
*
* @param threadPool threadpool to use for parallel scan
* @param parallelismFactor no. of parallel scans
* @param threadPool thread pool to use for parallel scan
* @param parallelismFactor no. of parallel scans.
* If parallelismFactor is 0 then parallelismFactor will be calculated based on table size
* @return Pair of List of leases from the storage and List of items failed to deserialize
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist

View file

@ -128,6 +128,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
* Initial dynamodb lease table write iops if creating the lease table
* @param metricsFactory
* Used to publish metrics about lease operations
* @param leaseAssignmentIntervalMillis
* Interval at which Lease assignment manager runs
*/
public DynamoDBLeaseCoordinator(
final LeaseRefresher leaseRefresher,
@ -143,7 +145,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final MetricsFactory metricsFactory,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap) {
final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap,
final long leaseAssignmentIntervalMillis) {
this.leaseRefresher = leaseRefresher;
this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
@ -152,8 +155,8 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
// Should run once every leaseDurationMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
// Should run twice every leaseAssignmentIntervalMillis to identify new leases before expiry.
this.leaseDiscovererIntervalMillis = (leaseAssignmentIntervalMillis / 2) - epsilonMillis;
this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);

View file

@ -108,6 +108,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
private long leaseAssignmentIntervalMillis;
/**
* Constructor.
@ -144,6 +145,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseCleanupConfig
* @param workerUtilizationAwareAssignmentConfig
* @param gracefulLeaseHandoffConfig
* @param leaseAssignmentIntervalMillis
*/
public DynamoDBLeaseManagementFactory(
final @NotNull KinesisAsyncClient kinesisClient,
@ -179,7 +181,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
boolean isMultiStreamMode,
final LeaseCleanupConfig leaseCleanupConfig,
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
final long leaseAssignmentIntervalMillis) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -214,6 +217,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tags = tags;
this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
this.leaseAssignmentIntervalMillis = leaseAssignmentIntervalMillis;
}
@Override
@ -239,7 +243,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory,
workerUtilizationAwareAssignmentConfig,
gracefulLeaseHandoffConfig,
shardInfoShardConsumerMap);
shardInfoShardConsumerMap,
leaseAssignmentIntervalMillis);
}
/**

View file

@ -15,6 +15,7 @@
package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
/**
* Default parallelism factor for scaling lease table.
*/
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024;
private static final double GB_PER_SEGMENT = 0.2;
private static final int MIN_SCAN_SEGMENTS = 1;
private static final int MAX_SCAN_SEGMENTS = 30;
private Integer cachedTotalSegments;
private Instant expirationTimeForTotalSegmentsCache;
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
final DdbTableConfig tableConfig = new DdbTableConfig();
tableConfig.billingMode(billingMode);
@ -553,9 +568,17 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
final List<String> leaseItemFailedDeserialize = new ArrayList<>();
final List<Lease> response = new ArrayList<>();
final List<Future<List<Map<String, AttributeValue>>>> futures = new ArrayList<>();
for (int i = 0; i < parallelScanTotalSegment; ++i) {
final int totalSegments;
if (parallelScanTotalSegment > 0) {
totalSegments = parallelScanTotalSegment;
} else {
totalSegments = getParallelScanTotalSegments();
}
for (int i = 0; i < totalSegments; ++i) {
final int segmentNumber = i;
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment)));
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments)));
}
try {
for (final Future<List<Map<String, AttributeValue>>> future : futures) {
@ -586,6 +609,41 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
}
/**
* Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size.
* The calculation follows these rules:
* - Each segment handles 0.2GB (214,748,364 bytes) of data
* - For empty tables or tables smaller than 0.2GB, uses 1 segment
* - Number of segments scales linearly with table size
*
* @return The number of segments to use for parallel scan, minimum 1
*/
private synchronized int getParallelScanTotalSegments() throws DependencyException {
if (isTotalSegmentsCacheValid()) {
return cachedTotalSegments;
}
int parallelScanTotalSegments =
cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments;
final DescribeTableResponse describeTableResponse = describeLeaseTable();
if (describeTableResponse == null) {
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
} else {
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
parallelScanTotalSegments = Math.min(
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
}
cachedTotalSegments = parallelScanTotalSegments;
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
return parallelScanTotalSegments;
}
private boolean isTotalSegmentsCacheValid() {
return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache);
}
private List<Map<String, AttributeValue>> scanSegment(final int segment, final int parallelScanTotalSegment)
throws DependencyException {

View file

@ -34,13 +34,19 @@ interface ConsumerState {
* the consumer to use build the task, or execute state.
* @param input
* the process input received, this may be null if it's a control message
* @param taskFactory
* a factory for creating tasks
* @return a valid task for this state or null if there is no task required.
*/
ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input);
ConsumerTask createTask(
ShardConsumerArgument consumerArgument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory);
/**
* Provides the next state of the consumer upon success of the task return by
* {@link ConsumerState#createTask(ShardConsumerArgument, ShardConsumer, ProcessRecordsInput)}.
* {@link ConsumerState#createTask(ShardConsumerArgument, ShardConsumer, ProcessRecordsInput, ConsumerTaskFactory)}.
*
* @return the next state that the consumer should transition to, this may be the same object as the current
* state.

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.lifecycle;
import lombok.Getter;
import lombok.experimental.Accessors;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
/**
* Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks,
@ -121,11 +120,11 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) {
return new BlockOnParentShardTask(
consumerArgument.shardInfo(),
consumerArgument.leaseCoordinator().leaseRefresher(),
consumerArgument.parentShardPollIntervalMillis());
ShardConsumerArgument consumerArgument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
return taskFactory.createBlockOnParentTask(consumerArgument);
}
@Override
@ -187,16 +186,11 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
return new InitializeTask(
argument.shardInfo(),
argument.shardRecordProcessor(),
argument.checkpoint(),
argument.recordProcessorCheckpointer(),
argument.initialPositionInStream(),
argument.recordsPublisher(),
argument.taskBackoffTimeMillis(),
argument.metricsFactory());
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
return taskFactory.createInitializeTask(argument);
}
@Override
@ -250,24 +244,11 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ThrottlingReporter throttlingReporter =
new ThrottlingReporter(5, argument.shardInfo().shardId());
return new ProcessTask(
argument.shardInfo(),
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
argument.taskBackoffTimeMillis(),
argument.skipShardSyncAtWorkerInitializationIfLeasesExist(),
argument.shardDetector(),
throttlingReporter,
input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),
argument.aggregatorUtil(),
argument.metricsFactory(),
argument.schemaRegistryDecoder(),
argument.leaseCoordinator().leaseStatsRecorder());
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
return taskFactory.createProcessTask(argument, input);
}
@Override
@ -331,14 +312,12 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
// TODO: notify shutdownrequested
return new ShutdownNotificationTask(
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
consumer.shutdownNotification(),
argument.shardInfo(),
consumer.shardConsumerArgument().leaseCoordinator());
return taskFactory.createShutdownNotificationTask(argument, consumer);
}
@Override
@ -405,7 +384,10 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
return null;
}
@ -483,25 +465,12 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
// TODO: set shutdown reason
return new ShutdownTask(
argument.shardInfo(),
argument.shardDetector(),
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
consumer.shutdownReason(),
argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(),
argument.leaseCoordinator(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory(),
input == null ? null : input.childShards(),
argument.streamIdentifier(),
argument.leaseCleanupManager());
return taskFactory.createShutdownTask(argument, consumer, input);
}
@Override
@ -569,7 +538,10 @@ class ConsumerStates {
@Override
public ConsumerTask createTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
ShardConsumerArgument argument,
ShardConsumer consumer,
ProcessRecordsInput input,
ConsumerTaskFactory taskFactory) {
return null;
}

View file

@ -0,0 +1,47 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@KinesisClientInternalApi
public interface ConsumerTaskFactory {
/**
* Creates a shutdown task.
*/
ConsumerTask createShutdownTask(ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input);
/**
* Creates a process task.
*/
ConsumerTask createProcessTask(ShardConsumerArgument argument, ProcessRecordsInput processRecordsInput);
/**
* Creates an initialize task.
*/
ConsumerTask createInitializeTask(ShardConsumerArgument argument);
/**
* Creates a block on parent task.
*/
ConsumerTask createBlockOnParentTask(ShardConsumerArgument argument);
/**
* Creates a shutdown notification task.
*/
ConsumerTask createShutdownNotificationTask(ShardConsumerArgument argument, ShardConsumer consumer);
}

View file

@ -0,0 +1,98 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.lifecycle;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
@KinesisClientInternalApi
public class KinesisConsumerTaskFactory implements ConsumerTaskFactory {
@Override
public ConsumerTask createShutdownTask(
ShardConsumerArgument argument, ShardConsumer consumer, ProcessRecordsInput input) {
return new ShutdownTask(
argument.shardInfo(),
argument.shardDetector(),
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
consumer.shutdownReason(),
argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(),
argument.leaseCoordinator(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory(),
input == null ? null : input.childShards(),
argument.streamIdentifier(),
argument.leaseCleanupManager());
}
@Override
public ConsumerTask createProcessTask(ShardConsumerArgument argument, ProcessRecordsInput processRecordsInput) {
ThrottlingReporter throttlingReporter =
new ThrottlingReporter(5, argument.shardInfo().shardId());
return new ProcessTask(
argument.shardInfo(),
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
argument.taskBackoffTimeMillis(),
argument.skipShardSyncAtWorkerInitializationIfLeasesExist(),
argument.shardDetector(),
throttlingReporter,
processRecordsInput,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),
argument.aggregatorUtil(),
argument.metricsFactory(),
argument.schemaRegistryDecoder(),
argument.leaseCoordinator().leaseStatsRecorder());
}
@Override
public ConsumerTask createInitializeTask(ShardConsumerArgument argument) {
return new InitializeTask(
argument.shardInfo(),
argument.shardRecordProcessor(),
argument.checkpoint(),
argument.recordProcessorCheckpointer(),
argument.initialPositionInStream(),
argument.recordsPublisher(),
argument.taskBackoffTimeMillis(),
argument.metricsFactory());
}
@Override
public ConsumerTask createBlockOnParentTask(ShardConsumerArgument argument) {
return new BlockOnParentShardTask(
argument.shardInfo(),
argument.leaseCoordinator().leaseRefresher(),
argument.parentShardPollIntervalMillis());
}
@Override
public ConsumerTask createShutdownNotificationTask(ShardConsumerArgument argument, ShardConsumer consumer) {
return new ShutdownNotificationTask(
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
consumer.shutdownNotification(),
argument.shardInfo(),
consumer.shardConsumerArgument().leaseCoordinator());
}
}

View file

@ -86,29 +86,33 @@ public class ShardConsumer {
private ProcessRecordsInput shardEndProcessRecordsInput;
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) {
this(
recordsPublisher,
executorService,
shardInfo,
logWarningForTaskAfterMillis,
shardConsumerArgument,
ConsumerStates.INITIAL_STATE,
8,
taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning);
}
private final ConsumerTaskFactory taskFactory;
//
// TODO: Make bufferSize configurable
//
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
ShardInfo shardInfo,
Optional<Long> logWarningForTaskAfterMillis,
ShardConsumerArgument shardConsumerArgument,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning,
ConsumerTaskFactory consumerTaskFactory) {
this(
recordsPublisher,
executorService,
shardInfo,
logWarningForTaskAfterMillis,
shardConsumerArgument,
ConsumerStates.INITIAL_STATE,
8,
taskExecutionListener,
readTimeoutsToIgnoreBeforeWarning,
consumerTaskFactory);
}
public ShardConsumer(
RecordsPublisher recordsPublisher,
ExecutorService executorService,
@ -118,7 +122,8 @@ public class ShardConsumer {
ConsumerState initialState,
int bufferSize,
TaskExecutionListener taskExecutionListener,
int readTimeoutsToIgnoreBeforeWarning) {
int readTimeoutsToIgnoreBeforeWarning,
ConsumerTaskFactory taskFactory) {
this.recordsPublisher = recordsPublisher;
this.executorService = executorService;
this.shardInfo = shardInfo;
@ -134,6 +139,7 @@ public class ShardConsumer {
if (this.shardInfo.isCompleted()) {
markForShutdown(ShutdownReason.SHARD_END);
}
this.taskFactory = taskFactory;
}
synchronized void handleInput(ProcessRecordsInput input, Subscription subscription) {
@ -345,7 +351,7 @@ public class ShardConsumer {
.taskType(currentState.taskType())
.build();
taskExecutionListener.beforeTaskExecution(taskExecutionListenerInput);
ConsumerTask task = currentState.createTask(shardConsumerArgument, ShardConsumer.this, input);
ConsumerTask task = currentState.createTask(shardConsumerArgument, ShardConsumer.this, input, taskFactory);
if (task != null) {
taskDispatchedAt = Instant.now();
currentTask = task;

View file

@ -14,8 +14,6 @@
*/
package software.amazon.kinesis.retrieval;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
/**
* Represents the result from the DataFetcher, and allows the receiver to accept a result
*/
@ -25,7 +23,7 @@ public interface DataFetcherResult {
*
* @return The result of the request, this can be null if the request failed.
*/
GetRecordsResponse getResult();
GetRecordsResponseAdapter getResult();
/**
* Accepts the result, and advances the shard iterator. A result from the data fetcher must be accepted before any
@ -33,7 +31,7 @@ public interface DataFetcherResult {
*
* @return the result of the request, this can be null if the request failed.
*/
GetRecordsResponse accept();
GetRecordsResponseAdapter accept();
/**
* Indicates whether this result is at the end of the shard or not

View file

@ -0,0 +1,55 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.List;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@KinesisClientInternalApi
public interface GetRecordsResponseAdapter {
/**
* Returns the list of records retrieved from GetRecords.
* @return list of {@link KinesisClientRecord}
*/
List<KinesisClientRecord> records();
/**
* The number of milliseconds the response is from the tip of the stream.
* @return long
*/
Long millisBehindLatest();
/**
* Returns the list of child shards of the shard that was retrieved from GetRecords.
* @return list of {@link ChildShard}
*/
List<ChildShard> childShards();
/**
* Returns the next shard iterator to be used to retrieve next set of records.
* @return String
*/
String nextShardIterator();
/**
* Returns the request id of the GetRecords operation.
* @return String containing the request id
*/
String requestId();
}

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.retrieval;
import java.util.Optional;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.polling.DataFetcher;
import software.amazon.kinesis.retrieval.polling.KinesisDataFetcher;
@ -34,7 +33,7 @@ public interface GetRecordsRetrievalStrategy {
* @throws IllegalStateException
* if the strategy has been shutdown.
*/
GetRecordsResponse getRecords(int maxRecords);
GetRecordsResponseAdapter getRecords(int maxRecords);
/**
* Releases any resources used by the strategy. Once the strategy is shutdown it is no longer safe to call

View file

@ -46,6 +46,27 @@ public class KinesisClientRecord {
private final boolean aggregated;
private final Schema schema;
protected KinesisClientRecord(
String sequenceNumber,
Instant approximateArrivalTimestamp,
ByteBuffer data,
String partitionKey,
EncryptionType encryptionType,
long subSequenceNumber,
String explicitHashKey,
boolean aggregated,
Schema schema) {
this.sequenceNumber = sequenceNumber;
this.approximateArrivalTimestamp = approximateArrivalTimestamp;
this.data = data;
this.partitionKey = partitionKey;
this.encryptionType = encryptionType;
this.subSequenceNumber = subSequenceNumber;
this.explicitHashKey = explicitHashKey;
this.aggregated = aggregated;
this.schema = schema;
}
public static KinesisClientRecord fromRecord(Record record) {
return KinesisClientRecord.builder()
.sequenceNumber(record.sequenceNumber())

View file

@ -0,0 +1,60 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.retrieval;
import java.util.List;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@RequiredArgsConstructor
@EqualsAndHashCode
@KinesisClientInternalApi
public class KinesisGetRecordsResponseAdapter implements GetRecordsResponseAdapter {
private final GetRecordsResponse getRecordsResponse;
@Override
public List<KinesisClientRecord> records() {
return getRecordsResponse.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
}
@Override
public Long millisBehindLatest() {
return getRecordsResponse.millisBehindLatest();
}
@Override
public List<ChildShard> childShards() {
return getRecordsResponse.childShards();
}
@Override
public String nextShardIterator() {
return getRecordsResponse.nextShardIterator();
}
@Override
public String requestId() {
return getRecordsResponse.responseMetadata().requestId();
}
}

View file

@ -32,9 +32,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/**
@ -87,11 +87,11 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
}
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
public GetRecordsResponseAdapter getRecords(final int maxRecords) {
if (executorService.isShutdown()) {
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResponse result = null;
GetRecordsResponseAdapter result = null;
CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
Set<Future<DataFetcherResult>> futures = new HashSet<>();
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable();

View file

@ -17,14 +17,13 @@ package software.amazon.kinesis.retrieval.polling;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
@ -59,13 +58,11 @@ public class BlockingRecordsPublisher implements RecordsPublisher {
}
public ProcessRecordsInput getNextResult() {
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
final RequestDetails getRecordsRequestDetails = new RequestDetails(
getRecordsResult.responseMetadata().requestId(), Instant.now().toString());
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
final RequestDetails getRecordsRequestDetails =
new RequestDetails(getRecordsResult.requestId(), Instant.now().toString());
setLastSuccessfulRequestDetails(getRecordsRequestDetails);
List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
List<KinesisClientRecord> records = getRecordsResult.records();
return ProcessRecordsInput.builder()
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())

View file

@ -15,13 +15,6 @@
package software.amazon.kinesis.retrieval.polling;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.retrieval.DataFetcherResult;
@ -76,39 +69,6 @@ public interface DataFetcher {
void resetIterator(
String shardIterator, String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request) throws Exception;
/**
* Retrieves the next get records request based on the current iterator.
*
* @param nextIterator specify the iterator to get the next record request
* @return {@link GetRecordsRequest}
*/
GetRecordsRequest getGetRecordsRequest(String nextIterator);
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
String getNextIterator(GetShardIteratorRequest request)
throws ExecutionException, InterruptedException, TimeoutException;
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
GetRecordsResponse getRecords(@NonNull String nextIterator);
/**
* Get the current account and stream information.
*

View file

@ -46,8 +46,10 @@ import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -163,17 +165,17 @@ public class KinesisDataFetcher implements DataFetcher {
final DataFetcherResult TERMINAL_RESULT = new DataFetcherResult() {
// CHECKSTYLE.ON: MemberName
@Override
public GetRecordsResponse getResult() {
return GetRecordsResponse.builder()
public GetRecordsResponseAdapter getResult() {
return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.millisBehindLatest(null)
.records(Collections.emptyList())
.nextShardIterator(null)
.childShards(Collections.emptyList())
.build();
.build());
}
@Override
public GetRecordsResponse accept() {
public GetRecordsResponseAdapter accept() {
isShardEndReached = true;
return getResult();
}
@ -187,15 +189,15 @@ public class KinesisDataFetcher implements DataFetcher {
@Data
class AdvancingResult implements DataFetcherResult {
final GetRecordsResponse result;
final GetRecordsResponseAdapter result;
@Override
public GetRecordsResponse getResult() {
public GetRecordsResponseAdapter getResult() {
return result;
}
@Override
public GetRecordsResponse accept() {
public GetRecordsResponseAdapter accept() {
nextIterator = result.nextShardIterator();
if (result.records() != null && !result.records().isEmpty()) {
lastKnownSequenceNumber = Iterables.getLast(result.records()).sequenceNumber();
@ -331,8 +333,13 @@ public class KinesisDataFetcher implements DataFetcher {
this.initialPositionInStream = initialPositionInStream;
}
@Override
public GetRecordsResponse getGetRecordsResponse(GetRecordsRequest request)
/**
* Retrieves the response based on the request.
*
* @param request the current get records request used to receive a response.
* @return GetRecordsResponse response for getRecords
*/
private GetRecordsResponseAdapter getGetRecordsResponse(GetRecordsRequest request)
throws ExecutionException, InterruptedException, TimeoutException {
final GetRecordsResponse response =
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(request), maxFutureWait);
@ -342,11 +349,16 @@ public class KinesisDataFetcher implements DataFetcher {
+ ". childShards: " + response.childShards()
+ ". Will retry GetRecords with the same nextIterator.");
}
return response;
return new KinesisGetRecordsResponseAdapter(response);
}
@Override
public GetRecordsRequest getGetRecordsRequest(String nextIterator) {
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponseAdapter}
*/
private GetRecordsRequest getGetRecordsRequest(String nextIterator) {
GetRecordsRequest.Builder builder = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(nextIterator)
.limit(maxRecords);
@ -354,16 +366,26 @@ public class KinesisDataFetcher implements DataFetcher {
return builder.build();
}
@Override
public String getNextIterator(GetShardIteratorRequest request)
/**
* Gets the next iterator based on the request.
*
* @param request used to obtain the next shard iterator
* @return next iterator string
*/
private String getNextIterator(GetShardIteratorRequest request)
throws ExecutionException, InterruptedException, TimeoutException {
final GetShardIteratorResponse result =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(request), maxFutureWait);
return result.shardIterator();
}
@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
/**
* Gets the next set of records based on the iterator.
*
* @param nextIterator specified shard iterator for getting the next set of records
* @return {@link GetRecordsResponse}
*/
private GetRecordsResponseAdapter getRecords(@NonNull final String nextIterator) {
GetRecordsRequest request = getGetRecordsRequest(nextIterator);
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
@ -372,7 +394,7 @@ public class KinesisDataFetcher implements DataFetcher {
boolean success = false;
long startTime = System.currentTimeMillis();
try {
final GetRecordsResponse response = getGetRecordsResponse(request);
final GetRecordsResponseAdapter response = getGetRecordsResponse(request);
success = true;
return response;
} catch (ExecutionException e) {

View file

@ -24,7 +24,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
@ -41,7 +40,6 @@ import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
@ -55,6 +53,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
@ -534,12 +533,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
GetRecordsResponseAdapter getRecordsResult =
getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
final List<KinesisClientRecord> records = getRecordsResult.records().stream()
.map(KinesisClientRecord::fromRecord)
.collect(Collectors.toList());
final List<KinesisClientRecord> records = getRecordsResult.records();
ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder()
.records(records)
.millisBehindLatest(getRecordsResult.millisBehindLatest())

View file

@ -16,8 +16,8 @@ package software.amazon.kinesis.retrieval.polling;
import lombok.Data;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
/**
@ -31,7 +31,7 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev
private final DataFetcher dataFetcher;
@Override
public GetRecordsResponse getRecords(final int maxRecords) {
public GetRecordsResponseAdapter getRecords(final int maxRecords) {
return dataFetcher.getRecords().accept();
}

View file

@ -17,6 +17,8 @@ import lombok.var;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
@ -49,6 +51,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -746,7 +749,8 @@ class LeaseAssignmentManagerTest {
Integer.MAX_VALUE,
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder()
.isGracefulLeaseHandoffEnabled(false)
.build());
.build(),
2 * 100L);
leaseAssignmentManager.start();
@ -1134,6 +1138,62 @@ class LeaseAssignmentManagerTest {
dynamoDbAsyncClient.putItem(putItemRequest);
}
@Test
void testLeaseAssignmentSchedulingWithDefaultInterval() {
long failoverTimeMillis = 1000L;
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
2 * failoverTimeMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(2 * failoverTimeMillis), eq(TimeUnit.MILLISECONDS));
}
@ParameterizedTest
@CsvSource({
"1000, 500", // leaseAssignmentInterval smaller than failover
"1000, 1000", // leaseAssignmentInterval equal to failover
"1000, 2000", // leaseAssignmentInterval larger than failover
})
void testLeaseAssignmentWithDifferentIntervals(long failoverTimeMillis, long leaseAssignmentIntervalMillis) {
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
LeaseAssignmentManager leaseAssignmentManager = new LeaseAssignmentManager(
leaseRefresher,
workerMetricsDAO,
mockLeaderDecider,
getWorkerUtilizationAwareAssignmentConfig(Double.MAX_VALUE, 20),
TEST_LEADER_WORKER_ID,
failoverTimeMillis,
new NullMetricsFactory(),
mockExecutor,
System::nanoTime,
Integer.MAX_VALUE,
gracefulLeaseHandoffConfig,
leaseAssignmentIntervalMillis);
leaseAssignmentManager.start();
verify(mockExecutor)
.scheduleWithFixedDelay(
any(Runnable.class), eq(0L), eq(leaseAssignmentIntervalMillis), eq(TimeUnit.MILLISECONDS));
}
private LeaseAssignmentManager createLeaseAssignmentManager(
final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config,
final Long leaseDurationMillis,
@ -1151,7 +1211,8 @@ class LeaseAssignmentManagerTest {
scheduledExecutorService,
nanoTimeProvider,
maxLeasesPerWorker,
gracefulLeaseHandoffConfig);
gracefulLeaseHandoffConfig,
2 * leaseDurationMillis);
leaseAssignmentManager.start();
return leaseAssignmentManager;
}

View file

@ -132,7 +132,8 @@ public class LeaseCoordinatorExerciser {
metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
2 * leaseDurationMillis);
coordinators.add(coord);
}

View file

@ -128,7 +128,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
metricsFactory,
new LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig(),
LeaseManagementConfig.GracefulLeaseHandoffConfig.builder().build(),
new ConcurrentHashMap<>());
new ConcurrentHashMap<>(),
2 * LEASE_DURATION_MILLIS);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION);

View file

@ -11,6 +11,8 @@ import java.util.concurrent.Executors;
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@ -22,6 +24,8 @@ import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndexDescri
import software.amazon.awssdk.services.dynamodb.model.IndexStatus;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
@ -393,6 +397,153 @@ class DynamoDBLeaseRefresherTest {
assertEquals("badLeaseKey", response.getValue().get(0));
}
@Test
public void listLeasesParallely_UseCachedTotalSegment()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
final long oneGBInBytes = 1073741824L;
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(oneGBInBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(5)).scan(any(ScanRequest.class));
// calling second to test cached value is used
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
// verify if describe table is called once even when listLeasesParallely is called twice
verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class));
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@Test
public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(1000L)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2);
verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class));
}
@Test
public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException()
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenThrow(ResourceNotFoundException.builder()
.message("Mock table does not exist scenario")
.build());
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
}
@ParameterizedTest
@CsvSource({
"0, 1", // 0
"1024, 1", // 1KB
"104857600, 1", // 100MB
"214748364, 1", // 0.2GB
"322122547, 2", // 1.3GB
"1073741824, 5", // 1GB
"2147483648, 10", // 2GB
"5368709120, 25", // 5GB
})
public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments)
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
.table(TableDescription.builder()
.tableName(TEST_LEASE_TABLE)
.tableStatus(TableStatus.ACTIVE)
.tableSizeBytes(tableSizeBytes)
.build())
.build()));
when(mockDdbClient.scan(any(ScanRequest.class)))
.thenReturn(CompletableFuture.completedFuture(
ScanResponse.builder().items(new ArrayList<>()).build()));
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
TEST_LEASE_TABLE,
mockDdbClient,
new DynamoDBLeaseSerializer(),
true,
NOOP_TABLE_CREATOR_CALLBACK,
Duration.ofSeconds(10),
new DdbTableConfig(),
true,
true,
new ArrayList<>());
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class));
}
@Test
void initiateGracefulLeaseHandoff_sanity() throws Exception {
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);

View file

@ -127,6 +127,7 @@ public class ConsumerStatesTest {
private long idleTimeInMillis = 1000L;
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
private SchemaRegistryDecoder schemaRegistryDecoder = null;
private final ConsumerTaskFactory taskFactory = new KinesisConsumerTaskFactory();
@Before
public void setup() {
@ -166,7 +167,8 @@ public class ConsumerStatesTest {
logWarningForTaskAfterMillis,
argument,
taskExecutionListener,
0));
0,
new KinesisConsumerTaskFactory()));
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
}
@ -177,7 +179,7 @@ public class ConsumerStatesTest {
ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(task, taskWith(BlockOnParentShardTask.class, ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(
@ -209,7 +211,7 @@ public class ConsumerStatesTest {
@Test
public void initializingStateTest() {
ConsumerState state = ShardConsumerState.INITIALIZING.consumerState();
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(task, initTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, initTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
@ -242,7 +244,7 @@ public class ConsumerStatesTest {
public void processingStateTestSynchronous() {
when(leaseCoordinator.leaseStatsRecorder()).thenReturn(leaseStatsRecorder);
ConsumerState state = ShardConsumerState.PROCESSING.consumerState();
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
@ -274,7 +276,7 @@ public class ConsumerStatesTest {
public void processingStateTestAsynchronous() {
when(leaseCoordinator.leaseStatsRecorder()).thenReturn(leaseStatsRecorder);
ConsumerState state = ShardConsumerState.PROCESSING.consumerState();
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
@ -306,7 +308,7 @@ public class ConsumerStatesTest {
public void processingStateRecordsFetcher() {
when(leaseCoordinator.leaseStatsRecorder()).thenReturn(leaseStatsRecorder);
ConsumerState state = ShardConsumerState.PROCESSING.consumerState();
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(task, procTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(task, procTask(ShardRecordProcessor.class, "shardRecordProcessor", equalTo(shardRecordProcessor)));
@ -339,7 +341,7 @@ public class ConsumerStatesTest {
ConsumerState state = ShardConsumerState.SHUTDOWN_REQUESTED.consumerState();
consumer.gracefulShutdown(shutdownNotification);
ConsumerTask task = state.createTask(argument, consumer, null);
ConsumerTask task = state.createTask(argument, consumer, null, taskFactory);
assertThat(
task,
@ -373,7 +375,7 @@ public class ConsumerStatesTest {
public void shutdownRequestCompleteStateTest() {
ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE;
assertThat(state.createTask(argument, consumer, null), nullValue());
assertThat(state.createTask(argument, consumer, null, taskFactory), nullValue());
assertThat(state.successTransition(), equalTo(state));
@ -409,7 +411,7 @@ public class ConsumerStatesTest {
childShards.add(leftChild);
childShards.add(rightChild);
when(processRecordsInput.childShards()).thenReturn(childShards);
ConsumerTask task = state.createTask(argument, consumer, processRecordsInput);
ConsumerTask task = state.createTask(argument, consumer, processRecordsInput, taskFactory);
assertThat(task, shutdownTask(ShardInfo.class, "shardInfo", equalTo(shardInfo)));
assertThat(
@ -443,7 +445,7 @@ public class ConsumerStatesTest {
ConsumerState state = ShardConsumerState.SHUTDOWN_COMPLETE.consumerState();
assertThat(state.createTask(argument, consumer, null), nullValue());
assertThat(state.createTask(argument, consumer, null, taskFactory), nullValue());
assertThat(state.successTransition(), equalTo(state));
for (ShutdownReason reason : ShutdownReason.values()) {

View file

@ -332,7 +332,7 @@ public class ShardConsumerTest {
verify(cache.subscription, times(3)).request(anyLong());
verify(cache.subscription).cancel();
verify(processingState, times(2)).createTask(eq(shardConsumerArgument), eq(consumer), any());
verify(processingState, times(2)).createTask(eq(shardConsumerArgument), eq(consumer), any(), any());
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput);
@ -394,7 +394,7 @@ public class ShardConsumerTest {
verify(cache.subscription, times(1)).request(anyLong());
verify(cache.subscription).cancel();
verify(processingState, times(1)).createTask(eq(shardConsumerArgument), eq(consumer), any());
verify(processingState, times(1)).createTask(eq(shardConsumerArgument), eq(consumer), any(), any());
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput);
@ -437,14 +437,14 @@ public class ShardConsumerTest {
cache.publish();
awaitAndResetBarrier(taskCallBarrier);
verify(processingState).createTask(any(), any(), any());
verify(processingState).createTask(any(), any(), any(), any());
verify(processingTask).call();
cache.awaitRequest();
cache.publish();
awaitAndResetBarrier(taskCallBarrier);
verify(processingState, times(2)).createTask(any(), any(), any());
verify(processingState, times(2)).createTask(any(), any(), any(), any());
verify(processingTask, times(2)).call();
cache.awaitRequest();
@ -460,7 +460,7 @@ public class ShardConsumerTest {
shutdownComplete = consumer.shutdownComplete().get();
} while (!shutdownComplete);
verify(processingState, times(3)).createTask(any(), any(), any());
verify(processingState, times(3)).createTask(any(), any(), any(), any());
verify(processingTask, times(3)).call();
verify(processingState).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(shutdownState).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
@ -487,7 +487,7 @@ public class ShardConsumerTest {
public final void testInitializationStateUponFailure() throws Exception {
final ShardConsumer consumer = createShardConsumer(recordsPublisher);
when(initialState.createTask(eq(shardConsumerArgument), eq(consumer), any()))
when(initialState.createTask(eq(shardConsumerArgument), eq(consumer), any(), any()))
.thenReturn(initializeTask);
when(initializeTask.call()).thenReturn(new TaskResult(new Exception("Bad")));
when(initializeTask.taskType()).thenReturn(TaskType.INITIALIZE);
@ -505,7 +505,7 @@ public class ShardConsumerTest {
awaitAndResetBarrier(taskBarrier);
}
verify(initialState, times(5)).createTask(eq(shardConsumerArgument), eq(consumer), any());
verify(initialState, times(5)).createTask(eq(shardConsumerArgument), eq(consumer), any(), any());
verify(initialState, never()).successTransition();
verify(initialState, never()).shutdownTransition(any());
}
@ -665,7 +665,7 @@ public class ShardConsumerTest {
public void testErrorThrowableInInitialization() throws Exception {
final ShardConsumer consumer = createShardConsumer(recordsPublisher);
when(initialState.createTask(any(), any(), any())).thenReturn(initializeTask);
when(initialState.createTask(any(), any(), any(), any())).thenReturn(initializeTask);
when(initialState.taskType()).thenReturn(TaskType.INITIALIZE);
when(initializeTask.call()).thenAnswer(i -> {
throw new Error("Error");
@ -692,13 +692,13 @@ public class ShardConsumerTest {
mockSuccessfulProcessing(taskBarrier);
when(processingState.shutdownTransition(eq(ShutdownReason.REQUESTED))).thenReturn(shutdownRequestedState);
when(shutdownRequestedState.createTask(any(), any(), any())).thenReturn(shutdownRequestedTask);
when(shutdownRequestedState.createTask(any(), any(), any(), any())).thenReturn(shutdownRequestedTask);
when(shutdownRequestedState.taskType()).thenReturn(TaskType.SHUTDOWN_NOTIFICATION);
when(shutdownRequestedTask.call()).thenReturn(new TaskResult(null));
when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.REQUESTED)))
.thenReturn(shutdownRequestedAwaitState);
when(shutdownRequestedAwaitState.createTask(any(), any(), any())).thenReturn(null);
when(shutdownRequestedAwaitState.createTask(any(), any(), any(), any())).thenReturn(null);
when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.LEASE_LOST)))
.thenReturn(shutdownState);
when(shutdownRequestedAwaitState.taskType()).thenReturn(TaskType.SHUTDOWN_COMPLETE);
@ -733,11 +733,11 @@ public class ShardConsumerTest {
shutdownComplete = consumer.shutdownComplete().get();
assertTrue(shutdownComplete);
verify(processingState, times(2)).createTask(any(), any(), any());
verify(processingState, times(2)).createTask(any(), any(), any(), any());
verify(shutdownRequestedState, never()).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(shutdownRequestedState).createTask(any(), any(), any());
verify(shutdownRequestedState).createTask(any(), any(), any(), any());
verify(shutdownRequestedState).shutdownTransition(eq(ShutdownReason.REQUESTED));
verify(shutdownRequestedAwaitState).createTask(any(), any(), any());
verify(shutdownRequestedAwaitState).createTask(any(), any(), any(), any());
verify(shutdownRequestedAwaitState).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput);
@ -781,7 +781,8 @@ public class ShardConsumerTest {
initialState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(null);
@ -836,7 +837,8 @@ public class ShardConsumerTest {
initialState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
CyclicBarrier taskArriveBarrier = new CyclicBarrier(2);
CyclicBarrier taskDepartBarrier = new CyclicBarrier(2);
@ -943,12 +945,13 @@ public class ShardConsumerTest {
mockState,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
when(mockState.state()).thenReturn(ShardConsumerState.WAITING_ON_PARENT_SHARDS);
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
final ConsumerTask mockTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockTask);
when(mockState.createTask(any(), any(), any(), any())).thenReturn(mockTask);
// Simulate successful BlockedOnParent task execution
// and successful Initialize task execution
when(mockTask.call()).thenReturn(new TaskResult(false));
@ -993,7 +996,7 @@ public class ShardConsumerTest {
reset(mockState);
when(mockState.taskType()).thenReturn(TaskType.PROCESS);
final ConsumerTask mockProcessTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask);
when(mockState.createTask(any(), any(), any(), any())).thenReturn(mockProcessTask);
when(mockProcessTask.call()).then(input -> {
// first we want to wait for subscribe to be called,
// but we cannot control the timing, so wait for 10 seconds
@ -1045,7 +1048,8 @@ public class ShardConsumerTest {
}
private void mockSuccessfulShutdown(CyclicBarrier taskArriveBarrier, CyclicBarrier taskDepartBarrier) {
when(shutdownState.createTask(eq(shardConsumerArgument), any(), any())).thenReturn(shutdownTask);
when(shutdownState.createTask(eq(shardConsumerArgument), any(), any(), any()))
.thenReturn(shutdownTask);
when(shutdownState.taskType()).thenReturn(TaskType.SHUTDOWN);
when(shutdownTask.call()).thenAnswer(i -> {
awaitBarrier(taskArriveBarrier);
@ -1063,7 +1067,7 @@ public class ShardConsumerTest {
}
private void mockSuccessfulProcessing(CyclicBarrier taskCallBarrier, CyclicBarrier taskInterlockBarrier) {
when(processingState.createTask(eq(shardConsumerArgument), any(), any()))
when(processingState.createTask(eq(shardConsumerArgument), any(), any(), any()))
.thenReturn(processingTask);
when(processingState.taskType()).thenReturn(TaskType.PROCESS);
when(processingTask.taskType()).thenReturn(TaskType.PROCESS);
@ -1088,7 +1092,8 @@ public class ShardConsumerTest {
}
private void mockSuccessfulInitialize(CyclicBarrier taskCallBarrier, CyclicBarrier taskInterlockBarrier) {
when(initialState.createTask(eq(shardConsumerArgument), any(), any())).thenReturn(initializeTask);
when(initialState.createTask(eq(shardConsumerArgument), any(), any(), any()))
.thenReturn(initializeTask);
when(initialState.taskType()).thenReturn(TaskType.INITIALIZE);
when(initializeTask.taskType()).thenReturn(TaskType.INITIALIZE);
when(initializeTask.call()).thenAnswer(i -> {
@ -1107,7 +1112,7 @@ public class ShardConsumerTest {
}
private void mockSuccessfulUnblockOnParents() {
when(blockedOnParentsState.createTask(eq(shardConsumerArgument), any(), any()))
when(blockedOnParentsState.createTask(eq(shardConsumerArgument), any(), any(), any()))
.thenReturn(blockedOnParentsTask);
when(blockedOnParentsState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
when(blockedOnParentsTask.call()).thenAnswer(i -> blockOnParentsTaskResult);
@ -1153,6 +1158,7 @@ public class ShardConsumerTest {
state,
1,
taskExecutionListener,
0);
0,
new KinesisConsumerTaskFactory());
}
}

View file

@ -38,6 +38,8 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -71,7 +73,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private KinesisAsyncClient kinesisClient;
private CompletionService<DataFetcherResult> completionService;
private GetRecordsResponse getRecordsResponse;
private GetRecordsResponseAdapter getRecordsResponse;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
@ -97,7 +99,8 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
completionService = spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(
dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
getRecordsResponse = GetRecordsResponse.builder().build();
getRecordsResponse = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build());
when(completionServiceSupplier.get()).thenReturn(completionService);
when(result.accept()).thenReturn(getRecordsResponse);
@ -106,7 +109,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Test
public void oneRequestMultithreadTest() {
when(result.accept()).thenReturn(null);
GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords();
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertNull(getRecordsResult);
@ -117,7 +120,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
ExecutorCompletionService<DataFetcherResult> completionService1 =
spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResponse getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
GetRecordsResponseAdapter getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords();
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertThat(getRecordsResult, equalTo(getRecordsResponse));
@ -127,7 +130,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
spy(new ExecutorCompletionService<DataFetcherResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertThat(getRecordsResult, nullValue(GetRecordsResponse.class));
assertThat(getRecordsResult, nullValue(GetRecordsResponseAdapter.class));
}
@Test(expected = ExpiredIteratorException.class)

View file

@ -30,6 +30,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -73,11 +75,12 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private DataFetcherResult dataFetcherResult;
private GetRecordsResponse expectedResponses;
private GetRecordsResponseAdapter expectedResponses;
@Before
public void before() {
expectedResponses = GetRecordsResponse.builder().build();
expectedResponses = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build());
when(completionServiceSupplier.get()).thenReturn(completionService);
when(dataFetcherResult.accept()).thenReturn(expectedResponses);
@ -93,7 +96,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(completionService.poll(anyLong(), any())).thenReturn(successfulFuture);
when(successfulFuture.get()).thenReturn(dataFetcherResult);
GetRecordsResponse result = strategy.getRecords(10);
GetRecordsResponseAdapter result = strategy.getRecords(10);
verify(executorService).isShutdown();
verify(completionService).submit(any());
@ -116,7 +119,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
GetRecordsResponse actualResults = strategy.getRecords(10);
GetRecordsResponseAdapter actualResults = strategy.getRecords(10);
verify(completionService, times(2)).submit(any());
verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
@ -156,7 +159,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
when(successfulFuture.cancel(anyBoolean())).thenReturn(false);
when(blockedFuture.cancel(anyBoolean())).thenReturn(true);
GetRecordsResponse actualResult = strategy.getRecords(10);
GetRecordsResponseAdapter actualResult = strategy.getRecords(10);
verify(completionService, times(3)).submit(any());
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));

View file

@ -55,7 +55,9 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import static org.hamcrest.CoreMatchers.isA;
@ -433,7 +435,7 @@ public class KinesisDataFetcherTest {
assertTrue(terminal.isShardEnd());
assertNotNull(terminal.getResult());
final GetRecordsResponse terminalResult = terminal.getResult();
final GetRecordsResponseAdapter terminalResult = terminal.getResult();
assertNotNull(terminalResult.records());
assertEquals(0, terminalResult.records().size());
assertNull(terminalResult.nextShardIterator());
@ -540,12 +542,13 @@ public class KinesisDataFetcherTest {
private DataFetcherResult assertAdvanced(
GetRecordsResponse expectedResult, String previousValue, String nextValue) {
DataFetcherResult acceptResult = kinesisDataFetcher.getRecords();
assertEquals(expectedResult, acceptResult.getResult());
KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult);
assertEquals(expectedResultAdapter, acceptResult.getResult());
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
assertFalse(kinesisDataFetcher.isShardEndReached());
assertEquals(expectedResult, acceptResult.accept());
assertEquals(expectedResultAdapter, acceptResult.accept());
assertEquals(nextValue, kinesisDataFetcher.getNextIterator());
if (nextValue == null) {
assertTrue(kinesisDataFetcher.isShardEndReached());
@ -557,7 +560,8 @@ public class KinesisDataFetcherTest {
private DataFetcherResult assertNoAdvance(final GetRecordsResponse expectedResult, final String previousValue) {
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());
DataFetcherResult noAcceptResult = kinesisDataFetcher.getRecords();
assertEquals(expectedResult, noAcceptResult.getResult());
KinesisGetRecordsResponseAdapter expectedResultAdapter = new KinesisGetRecordsResponseAdapter(expectedResult);
assertEquals(expectedResultAdapter, noAcceptResult.getResult());
assertEquals(previousValue, kinesisDataFetcher.getNextIterator());

View file

@ -46,7 +46,9 @@ import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -305,11 +307,12 @@ public class PrefetchRecordsPublisherIntegrationTest {
@Override
public DataFetcherResult getRecords() {
GetRecordsResponse getRecordsResult = GetRecordsResponse.builder()
GetRecordsResponseAdapter getRecordsResult =
new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(new ArrayList<>(records))
.nextShardIterator(nextShardIterator)
.millisBehindLatest(1000L)
.build();
.build());
return new AdvancingResult(getRecordsResult);
}

View file

@ -66,8 +66,10 @@ import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.lifecycle.ShardConsumerNotifyingSubscriber;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.GetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.KinesisGetRecordsResponseAdapter;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
@ -136,7 +138,7 @@ public class PrefetchRecordsPublisherTest {
private ExecutorService executorService;
private LinkedBlockingQueue<PrefetchRecordsPublisher.PrefetchRecordsRetrieved> spyQueue;
private PrefetchRecordsPublisher getRecordsCache;
private GetRecordsResponse getRecordsResponse;
private GetRecordsResponseAdapter getRecordsResponse;
private Record record;
@Before
@ -147,11 +149,11 @@ public class PrefetchRecordsPublisherTest {
getRecordsCache = createPrefetchRecordsPublisher(0L);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder()
getRecordsResponse = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(records)
.nextShardIterator(NEXT_SHARD_ITERATOR)
.childShards(Collections.emptyList())
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(getRecordsResponse);
}
@ -283,8 +285,8 @@ public class PrefetchRecordsPublisherTest {
public void testGetRecordsWithInvalidResponse() {
record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build();
GetRecordsResponse response =
GetRecordsResponse.builder().records(records).build();
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().records(records).build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response);
when(dataFetcher.isShardEndReached()).thenReturn(false);
@ -319,10 +321,10 @@ public class PrefetchRecordsPublisherTest {
childShards.add(leftChild);
childShards.add(rightChild);
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(records)
.childShards(childShards)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenReturn(response);
when(dataFetcher.isShardEndReached()).thenReturn(true);
@ -417,13 +419,13 @@ public class PrefetchRecordsPublisherTest {
@Test(expected = IllegalStateException.class)
public void testRequestRecordsOnSubscriptionAfterShutdown() {
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber("123")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@ -482,11 +484,11 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testRetryableRetrievalExceptionContinues() {
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.millisBehindLatest(100L)
.records(Collections.emptyList())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(new RetryableRetrievalException("Timeout", new TimeoutException("Timeout")))
.thenReturn(response);
@ -526,13 +528,14 @@ public class PrefetchRecordsPublisherTest {
//
final int[] sequenceNumberInResponse = {0};
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenAnswer(i -> GetRecordsResponse.builder()
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenAnswer(i -> new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber(++sequenceNumberInResponse[0] + "")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build());
.build()));
getRecordsCache.start(sequenceNumber, initialPosition);
@ -627,13 +630,13 @@ public class PrefetchRecordsPublisherTest {
//
// This test is to verify that the data consumption is not stuck in the case of an failed event delivery
// to the subscriber.
GetRecordsResponse response = GetRecordsResponse.builder()
GetRecordsResponseAdapter response = new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(Record.builder()
.data(SdkBytes.fromByteArray(new byte[] {1, 2, 3}))
.sequenceNumber("123")
.build())
.nextShardIterator(NEXT_SHARD_ITERATOR)
.build();
.build());
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);
getRecordsCache.start(sequenceNumber, initialPosition);
@ -710,7 +713,7 @@ public class PrefetchRecordsPublisherTest {
@Test
public void testResetClearsRemainingData() {
List<GetRecordsResponse> responses = Stream.iterate(0, i -> i + 1)
List<GetRecordsResponseAdapter> responses = Stream.iterate(0, i -> i + 1)
.limit(10)
.map(i -> {
Record record = Record.builder()
@ -720,10 +723,10 @@ public class PrefetchRecordsPublisherTest {
.approximateArrivalTimestamp(Instant.now())
.build();
String nextIterator = "shard-iter-" + (i + 1);
return GetRecordsResponse.builder()
return new KinesisGetRecordsResponseAdapter(GetRecordsResponse.builder()
.records(record)
.nextShardIterator(nextIterator)
.build();
.build());
})
.collect(Collectors.toList());
@ -778,7 +781,8 @@ public class PrefetchRecordsPublisherTest {
try {
// return a valid response to cause `lastSuccessfulCall` to initialize
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenReturn(GetRecordsResponse.builder().build());
.thenReturn(new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build()));
blockUntilRecordsAvailable();
} catch (RuntimeException re) {
Assert.fail("first call should succeed");
@ -803,7 +807,8 @@ public class PrefetchRecordsPublisherTest {
public void testProvisionedThroughputExceededExceptionReporter() {
when(getRecordsRetrievalStrategy.getRecords(anyInt()))
.thenThrow(ProvisionedThroughputExceededException.builder().build())
.thenReturn(GetRecordsResponse.builder().build());
.thenReturn(new KinesisGetRecordsResponseAdapter(
GetRecordsResponse.builder().build()));
getRecordsCache.start(sequenceNumber, initialPosition);
@ -822,20 +827,20 @@ public class PrefetchRecordsPublisherTest {
return getRecordsCache.getPublisherSession().evictPublishedRecordAndUpdateDemand("shardId");
}
private static class RetrieverAnswer implements Answer<GetRecordsResponse> {
private static class RetrieverAnswer implements Answer<GetRecordsResponseAdapter> {
private final List<GetRecordsResponse> responses;
private Iterator<GetRecordsResponse> iterator;
private final List<GetRecordsResponseAdapter> responses;
private Iterator<GetRecordsResponseAdapter> iterator;
public RetrieverAnswer(List<GetRecordsResponse> responses) {
public RetrieverAnswer(List<GetRecordsResponseAdapter> responses) {
this.responses = responses;
this.iterator = responses.iterator();
}
public void resetIteratorTo(String nextIterator) {
Iterator<GetRecordsResponse> newIterator = responses.iterator();
Iterator<GetRecordsResponseAdapter> newIterator = responses.iterator();
while (newIterator.hasNext()) {
GetRecordsResponse current = newIterator.next();
GetRecordsResponseAdapter current = newIterator.next();
if (StringUtils.equals(nextIterator, current.nextShardIterator())) {
if (!newIterator.hasNext()) {
iterator = responses.iterator();
@ -849,8 +854,8 @@ public class PrefetchRecordsPublisherTest {
}
@Override
public GetRecordsResponse answer(InvocationOnMock invocation) {
GetRecordsResponse response = iterator.next();
public GetRecordsResponseAdapter answer(InvocationOnMock invocation) {
GetRecordsResponseAdapter response = iterator.next();
if (!iterator.hasNext()) {
iterator = responses.iterator();
}

View file

@ -7,7 +7,7 @@ You can set configuration properties to customize Kinesis Client Library's funct
## List of KCL configuration properties
| Configuration property | Configuration class | Description | Default value |
|--------------------------------------------------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------:|
| ------------------------------------------------ | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------: |
| applicationName | ConfigsBuilder | The name for this the KCL application. Used as the default for the tableName and consumerName. | N/A |
| tableName | ConfigsBuilder | Allows overriding the table name used for the Amazon DynamoDB lease table. | N/A |
| streamName | ConfigsBuilder | The name of the stream that this application processes records from. | N/A |
@ -29,6 +29,7 @@ You can set configuration properties to customize Kinesis Client Library's funct
| maxThroughputPerHostKBps | LeaseManagementConfig | Amount of the maximum throughput to assign to a worker during the lease assignment. This is a new configuration introduced in KCL 3.x. | Unlimited |
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig | Controls the behavior of lease handoff between workers. When set to true, KCL will attempt to gracefully transfer leases by allowing the shard's RecordProcessor sufficient time to complete processing before handing off the lease to another worker. This can help ensure data integrity and smooth transitions but may increase handoff time. When set to false, the lease will be handed off immediately without waiting for the RecordProcessor to shut down gracefully. This can lead to faster handoffs but may risk incomplete processing. Note: Checkpointing must be implemented inside the shutdownRequested() method of the RecordProcessor to get benefited from the graceful lease handoff feature. This is a new configuration introduced in KCL 3.x. | TRUE |
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig | Specifies the minimum time (in milliseconds) to wait for the current shard's RecordProcessor to gracefully shut down before forcefully transferring the lease to the next owner. If your processRecords method typically runs longer than the default value, consider increasing this setting. This ensures the RecordProcessor has sufficient time to complete its processing before the lease transfer occurs. This is a new configuration introduced in KCL 3.x. | 30000 (30 seconds) |
| WorkerMetricsTableConfig | LeaseManagementConfig | This configuration determines the DynamoDB table settings such as table name, billing mode, provisioned capacity, PITR, deletion protection, and tags for the worker metrics table. | By default, KCL creates a worker metrics table named `<KCLApplicationName>-WorkerMetricStats` with on-demand billing mode. |
| maxRecords | PollingConfig | Allows setting the maximum number of records that Kinesis returns. | 10000 |
| retryGetRecordsInSeconds | PollingConfig | Configures the delay between GetRecords attempts for failures. | none |
| maxGetRecordsThreadPool | PollingConfig | The thread pool size used for GetRecords. | none |
@ -38,6 +39,7 @@ You can set configuration properties to customize Kinesis Client Library's funct
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Disable synchronizing shard data if the lease table contains existing leases. | FALSE |
| shardPrioritization | CoordinatorConfig | Which shard prioritization to use. | NoOpShardPrioritization |
| ClientVersionConfig | CoordinatorConfig | Determines which KCL version compatibility mode the application will run in. This configuration is only for the migration from previous KCL versions. When migrating to 3.x, you need to set this configuration to CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2x. You can remove this configuration when you complete the migration. | CLIENT_VERSION_CONFIG_3X |
| CoordinatorStateTableConfig | CoordinatorConfig | This configuration determines the DynamoDB table settings such as table name, billing mode, provisioned capacity, PITR, deletion protection, and tags for the coordinator state table. | By default, KCL creates a coordinator state table named `<KCLApplicationName>-CoordinatorState` with on-demand billing mode. |
| taskBackoffTimeMillis | LifecycleConfig | The time to wait to retry failed KCL tasks. The unit is milliseconds. | 500 (0.5 seconds) |
| logWarningForTaskAfterMillis | LifecycleConfig | How long to wait before a warning is logged if a task hasn't completed. | none |
| listShardsBackoffTimeInMillis | RetrievalConfig | The number of milliseconds to wait between calls to ListShards when failures occur. The unit is milliseconds. | 1500 (1.5 seconds) |
@ -47,10 +49,13 @@ You can set configuration properties to customize Kinesis Client Library's funct
| metricsLevel | MetricsConfig | Specifies the granularity level of CloudWatch metrics to be enabled and published. Possible values: NONE, SUMMARY, DETAILED. | MetricsLevel.DETAILED |
| metricsEnabledDimensions | MetricsConfig | Controls allowed dimensions for CloudWatch Metrics. | All dimensions |
## New configurations in KCL 3.x
The following configuration properties are newly added in KCL 3.x:
| Configuration property | Configuration class | Description | Default value |
|---|---|---|---:|
| --------------------------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------: |
| reBalanceThresholdPercentage | LeaseManagementConfig | A percentage value that determines when the load balancing algorithm should consider reassigning shards among workers. This is a new configuration introduced in KCL 3.x. | 10 |
| dampeningPercentage | LeaseManagementConfig | A percentage value that is used to dampen the amount of load that will be moved from the overloaded worker in a single rebalance operation. This is a new configuration introduced in KCL 3.x. | 60 |
| allowThroughputOvershoot | LeaseManagementConfig | Determines whether additional lease still needs to be taken from the overloaded worker even if it causes total amount of lease throughput taken to exceed the desired throughput amount. This is a new configuration introduced in KCL 3.x. | TRUE |
@ -58,6 +63,8 @@ The following configuration properties are newly added in KCL 3.x:
| maxThroughputPerHostKBps | LeaseManagementConfig | Amount of the maximum throughput to assign to a worker during the lease assignment. This is a new configuration introduced in KCL 3.x. | Unlimited |
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig | Controls the behavior of lease handoff between workers. When set to true, KCL will attempt to gracefully transfer leases by allowing the shard's RecordProcessor sufficient time to complete processing before handing off the lease to another worker. This can help ensure data integrity and smooth transitions but may increase handoff time. When set to false, the lease will be handed off immediately without waiting for the RecordProcessor to shut down gracefully. This can lead to faster handoffs but may risk incomplete processing. Note: Checkpointing must be implemented inside the shutdownRequested() method of the RecordProcessor to get benefited from the graceful lease handoff feature. This is a new configuration introduced in KCL 3.x. | TRUE |
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig | Specifies the minimum time (in milliseconds) to wait for the current shard's RecordProcessor to gracefully shut down before forcefully transferring the lease to the next owner. If your processRecords method typically runs longer than the default value, consider increasing this setting. This ensures the RecordProcessor has sufficient time to complete its processing before the lease transfer occurs. This is a new configuration introduced in KCL 3.x. | 30000 (30 seconds) |
| WorkerMetricsTableConfig | LeaseManagementConfig | This configuration determines the DynamoDB table settings such as table name, billing mode, provisioned capacity, PITR, deletion protection, and tags for the worker metrics table. | By default, KCL creates a worker metrics table named `<KCLApplicationName>-WorkerMetricStats` with on-demand billing mode. |
| CoordinatorStateTableConfig | CoordinatorConfig | This configuration determines the DynamoDB table settings such as table name, billing mode, provisioned capacity, PITR, deletion protection, and tags for the coordinator state table. | By default, KCL creates a coordinator state table named `<KCLApplicationName>-CoordinatorState` with on-demand billing mode. |
## Discontinued configuration properties in KCL 3.x
@ -67,5 +74,44 @@ The following configuration properties are discontinued in KCL 3.x:
| maxLeasesToStealAtOneTime | LeaseManagementConfig | The maximum number of leases an application should attempt to steal at one time. KCL 3.x will ignore this configuration and reassign leases based on the resource utilization of workers. |
| enablePriorityLeaseAssignment | LeaseManagementConfig | Controls whether workers should prioritize taking very expired leases (leases not renewed for 3x the failover time) and new shard leases, regardless of target lease counts but still respecting max lease limits. KCL 3.x will ignore this configuration and always spread expired leases across workers. |
> [!Important]
> You still must have the discontinued configuration properties during the migration from previous KCL verisons to KCL 3.x. During the migration, KCL workers will first start with the KCL 2.x compatible mode and switch to the KCL 3.x functionality mode when it detects that all KCL workers of the application are ready to run KCL 3.x. These discontinued configurations are needed while KCL workers are running the KCL 2.x compatible mode.
## How to set custom table names for DynamoDB metadata tables used by KCL
You can set a custom table names for DynamoDB metadata tables used by KCL such as lease table, worker metrics table, and coordinator state table, using `configsBuilder` in your main consumer application code. When you want to use custom names for the worker metrics table and coordinator state table, you should update the configuration of `workerMetricsTableConfig` and `coordinatorStateTableConfig`. You need to set the parameter `tableName` of these properties to the custom table names that you want to use.
### Lease table
If you want to use a custom name for the lease table, set the `tableName` parameter of `configBuilder` like the following example.
```
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName,
applicationName,
kinesisClient,
dynamoDbAsyncClient,
cloudWatchClient,
UUID.randomUUID().toString(),
new SampleRecordProcessorFactory()
).tableName("CustomNameForLeaseTable");
```
### Worker metrics table
If you want to use a custom name for the worker metrics table, set the `tableName` parameter of `workerMetricsTableConfig` like the following example. These two lines can be inserted below the code for configsBuilder creation.
```
LeaseManagementConfig leaseManagementConfig = configsBuilder.leaseManagementConfig();
leaseManagementConfig
.workerUtilizationAwareAssignmentConfig()
.workerMetricsTableConfig()
.tableName("CustomTableNameForWorkerMetricsTableTable");
```
### Coordinator state table
If you want to use a custom name for the coordinator state table, set the `tableName` parameter of `coordinatorStateTableConfig` like the following example. These two lines can be inserted below the code for configsBuilder creation.
```
CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
coordinatorConfig
.coordinatorStateTableConfig()
.tableName("CustomTableNameForCoordinatorStateTable");
```

152
docs/kcl_3x_deep-dive.md Normal file
View file

@ -0,0 +1,152 @@
## Introduction
This document explains the architectural changes in KCL 3.x regarding leader election and workload distribution, highlighting how KCL 3.x efficiently rebalances leases across workers using throughput metrics and CPU utilization.
- [Overview: Lease reassignments in KCL 3.x](#overview-lease-reassignments-in-kcl-3x)
- [Motivation for moving from distributed to leader-based protocol](#motivation-for-moving-from-distributed-to-leader-based-protocol)
- [How KCL 3.x manages leader election](#how-kcl-3x-manages-leader-election)
- [How KCL 3.x captures throughput per shard](#how-kcl-3x-captures-throughput-per-shard)
- [How KCL 3.x captures CPU utilization of worker hosts](#how-kcl-3x-captures-cpu-utilization-of-worker-hosts)
- [How KCL 3.x performs lease assignments](#how-kcl-3x-performs-lease-assignments)
## Overview: Lease reassignments in KCL 3.x
KCL 3.x rebalances leases based on the load (CPU utilization) on each worker with a goal of even distribution of load across workers. Unlike KCL 2.x where all workers scan the entire lease table to identify the lease assignment status and pick up unassigned leases or steal leases from existing workers, KCL 3.x achieves it by electing a leader worker scanning the entire lease table and performs the lease assignment for all KCL workers. The following are the key differences in KCL 3.x.
- **Centralized (leader-based) lease assignment**
KCL 3.x introduces a leader-based approach for assigning and rebalancing leases. Instead of each worker scanning the lease table and autonomously deciding how to pick up or steal leases (KCL 2.x approach), a single worker is elected as **leader**. This leader scans the DynamoDB tables to gather global state, decides on an optimal assignment, and updates the leases accordingly.
- **Throughput-aware rebalancing**
KCL 3.x tracks the actual throughput (bytes processed) of each shard. During lease assignment, the leader considers shard-level throughput, not just the shard count. This ensures to distribute “hot shards” more evenly and minimize the risk of overloading a single worker.
- **CPU utilization metrics (worker metrics)**
KCL 3.x introduces the concept of **worker metrics**, primarily CPU utilization at the launch of KCL 3.x, to guide the rebalancing decisions. KCL 3.x tries to keep each workers CPU utilization within a specified threshold range by redistributing leases away from overloaded workers. This ensures even CPU utilization across workers.
- **Reduced DynamoDB usage**
By centralizing the lease assignment logic in a single leader, KCL 3.x significantly reduces the number of scan operations on the DynamoDB lease table. Only the leader does a full table scan at a set frequency rather than all workers doing so simultaneously. Once the leader worker updates the lease assignments result in the lease table, other workers are reading the global secondary index on the lease table for an efficient lease discovery. Global secondary index mirrors the `leaseKey` attribute from the base lease table with the partition key of `leaseOwner`.
- **Graceful Lease Handover**
KCL 3.x introduces a more graceful approach to handing off leases during rebalancing. When a shard must be reassigned, KCL 3.x can minimize duplicated processing and reduce the need to reprocess records unnecessarily by checkpointing the last processed record in the worker that relinquishes the lease and resuming the processing in the new worker picking up the lease.
## Motivation for moving from distributed to leader-based protocol
There were several challenges in the distributed protocol in KCL 2.x:
1. **Suboptimal decisions and duplicate work:** In KCL 2.x, each worker does the duplicate work of listing all leases, computing target and expired leases and the performs assignment looking at only its current state without knowledge of what other works are doing. This leads to high convergence time (in terms of getting to same no. of leases per worker) and also leads to wasted/unoptimized usage of compute on all workers where each worker spends CPU cycle going through same code path)
2. **Lease churn and high MillisBehindLatest:** When stream scales (new leases added or leases are removed) or workers dies (due to failures or deployment) there is chaos/lease churn in the application due to lease stealing, where workers are free to steal any no. of leases (not just the leases that lost its owner). e.g., W0 worker has lease1-10 assigned to it and goes down, W1 picks all 10 leases 1-10 left by W0 and W2 then steals assigned leases from W1 randomly and the once which W1 already have with it, leading to chaos and increase in reading delays.
3. **Lack of load based assignment and global optimal assignment:** Lease count based assignment works good for workload with equal throughput on all leases and predictable/smooth traffic, however for workloads with different throughput on different shards the mere lease count based assignment causes difference in the resource consumption on different workers and thus requiring application to scale for worst case assignment and over-provision compute.
4. **High DynamoDB RCU usage:** As KCL 2.x lease assignment require each worker to list leases periodically to identify new leases and workers, workers need to make scan of whole lease table which adds to the RCU consumption.
5. **High races assignment of new leases (leading to wasted DynamoDB WCU):** KCL 2.x lease assignment works on work stealing principal, several workers race for assigning same lease to itself, this creates the race in assignment where only single worker wins the lease while others fail and waste RCU, CPU cycle, and time.
KCL 3.x solves challenges above by having a leader worker performing the assignment by looking into the global state of application and performs the assignment and load balancing of leases based on the utilization of each worker.
## How KCL 3.x manages leader election
### **Leader decider**
KCL has a specific worker that is chosen as a leader and does following tasks.
1. Periodically ensures the lease table matches the active shards in the stream (`PeriodicShardSync`)
2. Discover new streams that needs processing/consumption and creates the shards for them (in [multi-stream processing](https://docs.aws.amazon.com/streams/latest/dev/kcl-multi-stream.html) mode)
3. Performs lease assignment (in KCL 3.x)
KCL 2.x used a leader decider class called `DeterministicShuffleShardSyncLeaderDecider` ([GitHub link](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeterministicShuffleShardSyncLeaderDecider.java)). In KCL 3.x, the leader decider has been updated to `DynamoDBLockBasedLeaderDecider` ([GitHub link](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leader/DynamoDBLockBasedLeaderDecider.java)).
### **Why KCL 3.x stepped away from DeterministicShuffleShardSyncLeaderDecider?**
`DeterministicShuffleShardSyncLeaderDecider` is a component within KCL 2.x that handles leader election among workers. The component operates by scanning all `leaseOwners` (workers) within the KCL application, sorting them by worker ID, shuffling them using a constant seed, and designating the top worker as leader. This process informs each worker whether it should perform leader duties or operate as a standard worker.
The implementation relies on DynamoDB table scans to gather a complete list of `leaseOwners`. Every five minutes, each worker initiates a `listLeases` call to DynamoDB, scanning the entire lease table to compile a list of unique lease owners. While this approach provides a straightforward solution to leader election, it introduces following challenges:
- **1) High DynamoDB RCU Usage**: Full table scans performed every five minutes lead to high RCU consumption in the DynamoDB lease table. This consumption scales proportionally with the application size more workers and leases result in higher RCU usage, creating potential performance bottlenecks and increased operational costs.
- **2) High Convergence Time:** During worker failures or deployments, the five-minute interval forces the cluster to wait at least five minutes before recognizing and establishing a new leader. This delay significantly impacts system availability during critical operational periods.
- **3) Slow Failure Handling:** Newly elected leader may remain unaware of their leadership status for up to five minutes after selection. This delay creates a gap in leadership coverage and could impact the application's coordination capabilities during transition periods.
- **4) Low Consistency:** The five-minute heartbeat interval in leadership detection can create race conditions where multiple workers claim leadership simultaneously or no worker assumes leadership. This leads to redundant operations and wasted resources when multiple leaders exist, or delay in critical tasks like shard synchronization and stream discovery when leadership gaps exist.
### **Leader election in KCL 3.x**
KCL 3.x implements leader election using `DynamoDBLockClient` library ([GitHub link](https://github.com/awslabs/amazon-dynamodb-lock-client/)). Instead of performing full table scans of lease entries and finding the first worker in the sorted list of leases, this implementation uses a lock-based mechanism stored in the DynamoDB coordinator state table. Each worker in KCL 3.x follows this process:
1. Reads a single lock entry from the DynamoDB lock table
2. Validates the active status of the current lock owner by checking if the lock item gets constant heartbeat
3. Claims leadership if the previous lock owner's heartbeat has expired, following a first-worker-wins model
### **Leader failure handling in KCL 3.x**
The `DynamoDBLockClient` library provides the core failure handling capabilities. It monitors and responds to various failure scenarios including network partitions, worker shutdowns, and overloaded workers. When a worker fails to maintain its heartbeat on the lock item, KCL 3.x automatically enables another worker to claim leadership.
KCL 3.x extends the base failure handling with additional safeguards for critical operations such as lease assignments. If there are three consecutive failures in lease assignment by a leader ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/68a7a9bf53e03bd9177bbf2fd234aca103d7f5dc/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java#L81)), KCL 3.x detects and releases the leadership to enable other workers to take the leadership and perform lease assignments. This dual-layer failure handling mechanism ensures both infrastructure-level and application-level failures are handled effectively.
## How KCL 3.x captures throughput per shard
### **Shard throughput calculation**
Every time KCL 3.x retrieves a batch of records from a Kinesis data stream and deliver to the record processor, it computes the total bytes that was fetched from a specific shard and records it in the `LeaseStatsRecorder` (GitHub [class](https://github.com/awslabs/amazon-kinesis-client/blob/ae9a433ebd1b0bef74e643b205f8d4759a126e65/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java#L45), [code ref](https://github.com/awslabs/amazon-kinesis-client/blob/ae9a433ebd1b0bef74e643b205f8d4759a126e65/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java#L180)). These stats are accumulated for the `leaseRenewerFrequency` (`failoverTimeMillis/3`). During `leaseRenewal` (which runs at `leaseRenewerFrequency`), the shard throughput is computed and updated in the KCL leases throughput attribute. The final shard throughput is calculated as following:
- Calculate the shard throughput in the last lease renewal interval (bytes delivered to record processor in the last interval / `leaseRenewerFrequency`)
- To avoid the swing from the short-term traffic surges, take an exponential moving average (EMA) with an alpha of 0.5 on the previous and current value ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/utils/ExponentialMovingAverage.java#L45)). This smoothed stat is used for the lease assignment in KCL 3.x.
## How KCL 3.x captures CPU utilization of worker hosts
### **Amazon Elastic Compute Cloud (EC2)**
KCL collects CPU utilization on Amazon EC2 instances by reading system statistics from the `/proc/stat` file. This file provides kernel/system statistics, from which KCL calculates the CPU utilization in percentage ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/impl/linux/LinuxCpuWorkerMetric.java)) for the worker host.
### Amazon Elastic Container Service (ECS)
KCL 3.x retrieves container-level metrics through the ECS task metadata endpoint, accessible within the container via the `${ECS_CONTAINER_METADATA_URI_V4}` environment variable. The ECS container agent provides a local endpoint reachable in the container by the task to retrieve task metadata and Docker stats, known as the task metadata endpoint ([ref](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html)). The URI is injected into the container with the system variable `${ECS_CONTAINER_METADATA_URI_V4}`.
This endpoint provides CPU, memory, and network statistics on a per task granularity. Even when multiple KCL applications run in separate containers on the same EC2 host, these statistics would not aggregate for the whole host ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/impl/container/EcsCpuWorkerMetric.java)).
The CPU utilization percentage is calculated with the following formula ([ref](https://docs.docker.com/engine/api/v1.45/#tag/Container/operation/ContainerStats)):
```
cpuUtilization = (cpuDelta / SystemDelta) * online_cpus * 100.0
```
where:
```
cpuDelta = total_usage - prev_total_usage
systemDelta = system_cpu_usage - prev_system_cpu_usage
```
This gets the value in CPU cores. For example, if there is a limit of 3 CPUs given to a task and the KCL is the only container, this value can be up to 3. So, to get percent utilization, we must determine the maximum number of CPUs the KCL container can use.
Getting that value requires a couple of steps. The hard limit on a task is the [CPU task size](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html#task_size). This is the maximum CPU time that all containers in the task can use. The second limit is the [container CPU size](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definition_parameters.html#container_definitions). This is a relative amount to other containers in the task. For example, if a task CPU size is 3 CPUs, and there are two containers with 256 and 512 CPU size, then the first container would guarantee 1 CPU and the second container is guaranteed 2 CPUs. However, each container can use more than the minimum up to the CPU task size if the other container is not using all of its CPU. Though if both containers use 100% of their CPU all the time, only the guaranteed CPUs would be used.
So, to get the CPUs guaranteed to the container, we first get the value of the CPU task size. Then, we get the CPU shares of the current container. Divide those shares by the total number of shares from all containers and multiply by the CPU task size to get the max amount of CPUs available to the container.
### Elastic Kubernetes Service (EKS) containers running on Linux instances with cgroupv1 or cgroupv2
KCL 3.x utilizes Linux Control Groups to extract CPU information about the container. KCL 3.x reads CPU time and available CPU from cgroup directory. Amazon Linux 2 uses cgroupv1 and Amazon Linux 2023 uses cgroupv2. CPU time is measured in CPU cores time. A container is limited by amount of CPU core time it is allocated. So, if over a second the container uses 0.5 CPU core time and is allocated 2 CPU cores, the CPU utilization would be 25%. When this is invoked for the first time, the value returned is always 0 as the previous values are not available to calculate the diff ([GitHub code ref 1](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/impl/container/Cgroupv1CpuWorkerMetric.java), [GitHub code ref 2](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/impl/container/Cgroupv2CpuWorkerMetric.java)).
The file `/sys/fs/cgroup/cpu/cpuacct.usage` contains a value of the CPU time used by the container in nanoseconds. To calculate the CPU utilization percent, we can get the maximum amount of CPU cores allocated to the container by reading the files `/sys/fs/cgroup/cpu/cpu.cfs_quota_u`s and `/sys/fs/cgroup/cpu/cpu.cfs_period_us`. the quota divided by the period will give the max amount of CPU core time given to the container. If the value of the quota is -1, this means that the container is not limited in CPU time by the cgroup and can utilize all the cores on the node. We can get this value in java by calling `Runtime._getRuntime_().availableProcessors()`. The file paths and format of some data are the difference between cgroupv1 and cgroupv2, and the process described is the same besides that.
## How KCL 3.x performs lease assignments
Like mentioned in the previous sections, only the leader worker regularly scans the entire lease table and assign leases in KCL 3.x. `LeaseAssignmentManager` class ([GitHub link](https://github.com/awslabs/amazon-kinesis-client/blob/68a7a9bf53e03bd9177bbf2fd234aca103d7f5dc/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java#L81)) performs two operations related to the lease assignment:
- **1) Priority lease assignments:** assigning unassigned leases (lease without owners) and expired leases (leases where the lease duration has elapsed).
- **2) Lease rebalancing:** shuffling leases to equalize workers utilization to achieve even distribution of load.
For non-leader workers, `LeaseAssignmentManager` will do nothing as the assignment will be done on the leader ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/68a7a9bf53e03bd9177bbf2fd234aca103d7f5dc/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java#L181)). Lease assignment follows a greedy algorithm, prioritizing workers with the lowest utilization. To prevent over-allocation, the assignment process considers the following:
- Worker's current utilization
- Existing lease throughput
- Projected load from potential new leases
- Number of leases held by worker
`LeaseAssignmentManager` iteratively assigns multiple leases per worker while maintaining balanced resource distribution based on these metrics. Throughput serves as a proxy metric to estimate the impact of lease changes on workers load (i.e., workers CPU utilization). When assigning or removing leases, KCL 3.x uses throughput data to predict how these operations will affect the overall worker utilization and use that to influence the assignment decision. The following is the detailed rebalancing process ([GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/68a7a9bf53e03bd9177bbf2fd234aca103d7f5dc/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/VarianceBasedLeaseAssignmentDecider.java)).
1. Calculate the average worker metrics value (CPU utilization) across all workers.
2. Compute the upper and lower limit thresholds for triggering the rebalancing. The upper and lower limits are calculated based on the average worker metrics value and `reBalanceThresholdPercentage` (10 by default; [GitHub code ref](https://github.com/awslabs/amazon-kinesis-client/blob/68a7a9bf53e03bd9177bbf2fd234aca103d7f5dc/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L541)) configuration parameter.
- Upper limit: average worker metric * (1 + `reBalanceThresholdPercentage`/100).
- Lower limit: average worker metric * (1 + `reBalanceThresholdPercentage`/100).
3. Trigger a rebalancing if any workers worker metric value falls outside these limits.
- For example, lets assume that worker A has 70% CPU utilization and worker B has 40% CPU utilization. The average worker metric is (70+40)/2 = 55%. Then, the upper limit is 55% * (1+10/100) = 60.5%, and the lower limit is 55% (1-10/100) = 49.5%. Both worker A and B are outside the upper and lower limit, hence the rebalancing will be triggered.
4. Calculate how much load (CPU utilization) to take from the worker in out-of-range to make them back to the average CPU utilization. The load to take is calculated by subtracting the average load from the current load.
5. Apply `dampeningPercentageValue` (80 by default) to the calculated load to take from the worker in out-of-range. Dampening will prevent the excessive rebalancing that would cause oscillation and help to achieve critical damping.
6. Calculate the throughput to take from the worker based on the dampened amount of load to take.
7. Select and reassign specific leases matching the dampened throughput target from over-utilized worker to under-utilized workers.
When KCL applications run on platforms that don't support CPU utilization metrics such as Windows, the system automatically falls back to a throughput-based balancing mechanism. In this fallback mode, KCL 3.x distributes leases based on shard throughput, aiming for all workers processing equal throughput of shards.
### Rebalancing related configuration parameters
- `reBalanceThresholdPercentage:` This parameter is a percentage value that determines when the load balancing algorithm should consider reassigning shards among workers. The default value is set to 10. You can set a lower value to make your target resource utilization converge more tightly for more frequent rebalancing. If you want to avoid frequent rebalancing, you can set a higher value to loosen the rebalancing sensitivity.
- `dampeningPercentageValue`: This parameter is used to achieve critical damping during reassignments. It helps prevent overly aggressive rebalancing by limiting the amount of load that can be moved in a single rebalance operation. The default value is set to 80. You can set a lower value for more gradual load balancing that minimize the oscillation. If you want to more aggressive rebalancing, you can set a higher value for quick adjustment of the load.

View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.3</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>