Documentation: addressed comments for leases.
+ minor code cleanup
This commit is contained in:
parent
c5e3d5e22c
commit
1e7fa749c3
3 changed files with 24 additions and 20 deletions
|
|
@ -59,7 +59,7 @@ public interface ShardDetector {
|
||||||
/**
|
/**
|
||||||
* List shards with shard filter.
|
* List shards with shard filter.
|
||||||
*
|
*
|
||||||
* @param ShardFilter
|
* @param shardFilter
|
||||||
* @return Shards
|
* @return Shards
|
||||||
*/
|
*/
|
||||||
default List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
|
default List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
|
||||||
|
|
|
||||||
|
|
@ -32,17 +32,18 @@ import java.util.stream.Collectors;
|
||||||
* Helper class for cleaning up leases.
|
* Helper class for cleaning up leases.
|
||||||
*/
|
*/
|
||||||
@Accessors(fluent = true)
|
@Accessors(fluent = true)
|
||||||
|
@EqualsAndHashCode
|
||||||
@Value
|
@Value
|
||||||
@EqualsAndHashCode(exclude = {"queueEntryTime"})
|
|
||||||
public class LeasePendingDeletion {
|
public class LeasePendingDeletion {
|
||||||
private final StreamIdentifier streamIdentifier;
|
|
||||||
private final Lease lease;
|
StreamIdentifier streamIdentifier;
|
||||||
private final ShardInfo shardInfo;
|
Lease lease;
|
||||||
private final ShardDetector shardDetector;
|
ShardInfo shardInfo;
|
||||||
|
ShardDetector shardDetector;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Discovers the child shards for this lease.
|
* Discovers the child shards for this lease.
|
||||||
* @return
|
*
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
* @throws ExecutionException
|
* @throws ExecutionException
|
||||||
* @throws TimeoutException
|
* @throws TimeoutException
|
||||||
|
|
|
||||||
|
|
@ -49,24 +49,25 @@ Discovered shards may result from:
|
||||||
* First time starting KCL with an empty lease table.
|
* First time starting KCL with an empty lease table.
|
||||||
* Stream mutations (i.e., split, merge) that create child shards.
|
* Stream mutations (i.e., split, merge) that create child shards.
|
||||||
* In multi-stream mode, dynamic discovery of a new stream.
|
* In multi-stream mode, dynamic discovery of a new stream.
|
||||||
1. `CREATION`: Leases are created 1:1 for each discovered shard, and initialized at the configured initial position.
|
1. `CREATION`: Leases are created 1:1 for each discovered shard.
|
||||||
* Leases are only created if they are eligible for processing.
|
* Leases are only created if they are eligible for processing.
|
||||||
For example, child shards will not have leases created until its parent(s) have reached `SHARD_END`.
|
For example, child shards will not have leases created until its parent(s) have reached `SHARD_END`.
|
||||||
|
* Leases are initialized at the configured initial position.
|
||||||
|
* A notable exception is that child leases are initialized at `TRIM_HORIZON` to avoid processing gaps from their parent lease(s).
|
||||||
1. `PROCESSING`: Leases are processed, and continually updated with new checkpoints.
|
1. `PROCESSING`: Leases are processed, and continually updated with new checkpoints.
|
||||||
* In general, leases spend the majority of their life in this state.
|
* In general, leases spend the majority of their life in this state.
|
||||||
1. `SHARD_END`: The associated shard is `SHARD_END` and all records have been processed by KCL for the shard.
|
1. `SHARD_END`: The associated shard is `SHARD_END` and all records have been processed by KCL for the shard.
|
||||||
1. `DELETION`: Since there are no more records to process, KCL will delete the lease from the lease table.
|
1. `DELETION`: Since there are no more records to process, KCL will delete the lease from the lease table.
|
||||||
* Deletion will only be triggered after all parents of a child shard have converged to `SHARD_END`.
|
* Lease deletion will not occur until after its child lease(s) enter `PROCESSING`.
|
||||||
Convergence is required to preserve ordering of records between parent-child relationships.
|
* This tombstone helps KCL ensure durability and convergence for all discovered leases.
|
||||||
* Deletion is configurable yet recommended to minimize I/O of lease table scans.
|
* To dive deep, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote]
|
||||||
|
* [Deletion is configurable][lease-cleanup-config] yet recommended to minimize I/O of lease table scans.
|
||||||
|
|
||||||
### Lease Syncing
|
### Lease Syncing
|
||||||
|
|
||||||
Lease syncing is a complex responsibility owned by the "leader" host in a KCL application.
|
Lease syncing is a complex responsibility owned by the "leader" host in a KCL application.
|
||||||
By invoking the [ListShards API][list-shards], KCL will identify the shards for the configured stream(s).
|
By invoking the [ListShards API][list-shards], KCL will identify the shards for the configured stream(s).
|
||||||
This process is scheduled at a
|
This process is scheduled at a [configurable interval][lease-auditor-config] so KCL can decide whether it should query for potentially-new shards.
|
||||||
[configurable interval](https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L204-L209)
|
|
||||||
so KCL can self-identify new shards introduced via stream mutations.
|
|
||||||
|
|
||||||
)[^checkforshardsync].
|
[PeriodicShardSyncManager#checkForShardSync(...)][periodic-shard-sync-manager-checkforshardsync])[^fixed-commit-footnote].
|
||||||
|
|
||||||
## Lease Balancing
|
## Lease Balancing
|
||||||
|
|
||||||
|
|
@ -129,14 +130,11 @@ For convenience, links to code:
|
||||||
|
|
||||||
Leases are stolen if-and-only-if there were zero expired leases and the looking-to-steal-worker desires more leases.
|
Leases are stolen if-and-only-if there were zero expired leases and the looking-to-steal-worker desires more leases.
|
||||||
Stolen leases are randomly selected from whichever worker has the most leases.
|
Stolen leases are randomly selected from whichever worker has the most leases.
|
||||||
The number of leases to steal on each loop is configured via `maxLeasesToStealAtOneTime`.
|
The maximum number of leases to steal on each loop is configured via [maxLeasesToStealAtOneTime][max-leases-to-steal-config].
|
||||||
|
|
||||||
Customers should consider the following trade-offs when configuring the lease-taking cadence:
|
Customers should consider the following trade-offs when configuring the lease-taking cadence:
|
||||||
1. `LeaseRefresher` invokes a DDB `scan` against the lease table which has a cost proportional to the number of leases.
|
1. `LeaseRefresher` invokes a DDB `scan` against the lease table which has a cost proportional to the number of leases.
|
||||||
1. Frequent balancing may cause high lease turn-over which incurs DDB `write` costs, and potentially redundant work for stolen leases.
|
1. Frequent balancing may cause high lease turn-over which incurs DDB `write` costs, and potentially redundant work for stolen leases.
|
||||||
1. High `maxLeasesToStealAtOneTime` may cause churn.
|
|
||||||
* For example, worker `B` steals multiple leases from worker `A` creating a numerical imbalance.
|
|
||||||
In the next loop, worker `C` may steal leases from worker `B`.
|
|
||||||
1. Low `maxLeasesToStealAtOneTime` may increase the time to fully (re)assign leases after an impactful event (e.g., deployment, host failure).
|
1. Low `maxLeasesToStealAtOneTime` may increase the time to fully (re)assign leases after an impactful event (e.g., deployment, host failure).
|
||||||
|
|
||||||
# Additional Reading
|
# Additional Reading
|
||||||
|
|
@ -144,7 +142,7 @@ Customers should consider the following trade-offs when configuring the lease-ta
|
||||||
Informative articles that are recommended (in no particular order):
|
Informative articles that are recommended (in no particular order):
|
||||||
* https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/
|
* https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/
|
||||||
|
|
||||||
[^checkforshardsync]: This code is a point-in-time reference to a specific commit to provide fixed line numbers.
|
[^fixed-commit-footnote]: This link is a point-in-time reference to a specific commit to guarantee static line numbers.
|
||||||
This code reference is not guaranteed to remain consistent with the `master` branch.
|
This code reference is not guaranteed to remain consistent with the `master` branch.
|
||||||
|
|
||||||
[consumer-task]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerTask.java
|
[consumer-task]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerTask.java
|
||||||
|
|
@ -152,6 +150,9 @@ Informative articles that are recommended (in no particular order):
|
||||||
[dynamodb]: https://aws.amazon.com/dynamodb/
|
[dynamodb]: https://aws.amazon.com/dynamodb/
|
||||||
[hierarchical-shard-syncer]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
|
[hierarchical-shard-syncer]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
|
||||||
[kcl-leasetable]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable
|
[kcl-leasetable]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable
|
||||||
|
[lease-auditor-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L204-L209
|
||||||
|
[lease-cleanup-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L112-L128
|
||||||
|
[lease-cleanup-manager-cleanupleaseforcompletedshard]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java#L263-L294
|
||||||
[lease-coordinator]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
|
[lease-coordinator]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
|
||||||
[lease-coordinator-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
|
[lease-coordinator-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
|
||||||
[lease-refresher]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
|
[lease-refresher]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
|
||||||
|
|
@ -159,8 +160,10 @@ Informative articles that are recommended (in no particular order):
|
||||||
[lease-taker]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java
|
[lease-taker]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java
|
||||||
[lease-taker-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
|
[lease-taker-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java
|
||||||
[list-shards]: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html
|
[list-shards]: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html
|
||||||
|
[max-leases-to-steal-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L142-L149
|
||||||
[non-empty-lease-table-synchronizer]: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java#L857-L910
|
[non-empty-lease-table-synchronizer]: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java#L857-L910
|
||||||
[periodic-shard-sync-manager]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java
|
[periodic-shard-sync-manager]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java
|
||||||
|
[periodic-shard-sync-manager-checkforshardsync]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java#L267-L300
|
||||||
[scheduler]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
|
[scheduler]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
|
||||||
[shard-detector]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
|
[shard-detector]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
|
||||||
[shard-detector-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
|
[shard-detector-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue