Documentation: language review.
+ decomposed shard sync UML into two separate diagrams (initialization, loop)
BIN
docs/images/lease-shard-sync-initialization.png
Normal file
|
After Width: | Height: | Size: 26 KiB |
BIN
docs/images/lease-shard-sync-loop.png
Normal file
|
After Width: | Height: | Size: 70 KiB |
|
Before Width: | Height: | Size: 100 KiB |
|
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 35 KiB |
|
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 27 KiB |
|
|
@ -1,40 +1,41 @@
|
|||
# Leases and Lease Lifecycle
|
||||
# Lease Lifecycle
|
||||
|
||||
Leases are created in, and leveraged by, KCL to manage a stream's shards.
|
||||
This document should help provide insights into the lease lifecycle.
|
||||
A lease is data that defines the binding between a worker and a shard.
|
||||
Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers.
|
||||
At any given time, each shard of data records is bound to a particular worker by a lease identified by the `leaseKey` variable.
|
||||
|
||||
**Nota bene (N.B.):** while actual shard ids are formatted like `shardId-000000000042`, this document uses `shardId[_-]42` for concision.
|
||||
This document describes the lease lifecycle.
|
||||
|
||||
**Note:** shard ids are simplified from `shardId-000000000042` to read as `shardId[_-]42` for simplicity.
|
||||
|
||||
## Leases
|
||||
|
||||
In KCL, a lease provides a temporal assignment between one Kinesis shard and an assigned worker.
|
||||
In KCL, a lease provides a temporal assignment between one shard and an assigned worker.
|
||||
Leases are persistent for the duration of shard processing (detailed later).
|
||||
However, the worker that is processing a lease may change since leases may be "stolen" by other workers in the same KCL application.
|
||||
However, the worker that is processing a lease may change since [leases may be "stolen"](#lease-balancing) by other workers in the same KCL application.
|
||||
|
||||
## Lease Table
|
||||
|
||||
To persist metadata about lease state (e.g., last read checkpoint, current assigned worker), KCL creates a lease table in [DynamoDB][dynamodb].
|
||||
To persist metadata about lease state (e.g., [last read checkpoint, current assigned worker][kcl-concepts]), KCL creates a lease table in [DynamoDB][dynamodb].
|
||||
Each KCL application will have its own distinct lease table that includes the application name.
|
||||
More information, including schema, is provided at [KCL LeaseTable][kcl-leasetable].
|
||||
|
||||
## Lease Assignment
|
||||
|
||||
The "life" of a lease closely mirrors the duration that a shard is being processed.
|
||||
Leases are unique to the shard and are not recycled for stream operations (i.e., split, merge).
|
||||
A shard created by stream operations will generate a new lease.
|
||||
|
||||

|
||||
|
||||
Specifically, leases are unique to the shard and are not recycled for stream mutations (i.e., split, merge).
|
||||
Shards created by stream mutations will generate a new lease.
|
||||
](images/leases-and-operations.png)
|
||||
|
||||
It should be noted that the number of tracked leases may exceed the number of shards.
|
||||
Per the diagram (above), this can occur when there are stream mutations propagating through KCL.
|
||||
Per the diagram (above), this can occur when there are stream operations propagating through KCL.
|
||||
For example, a 10-shard stream that is split on every shard may temporarily have up-to 30 leases: 10 original + 20 children.
|
||||
|
||||
N.B. Leases are uniquely identified by their `leaseKey` which looks vastly different than `lease_X`.
|
||||
**Note:** Leases are uniquely identified by their `leaseKey` which looks vastly different than `lease_X`.
|
||||
For details on the `leaseKey` format, please see [KCL LeaseTable][kcl-leasetable].
|
||||
|
||||
## Lease Lifecycle
|
||||
|
|
@ -44,10 +45,10 @@ Leases follow a relatively simple, progressive state machine:
|
|||
|
||||
Excluding `SHARD_END`, these phases are illustrative of KCL logic and are not explicitly codified.
|
||||
|
||||
1. `DISCOVERY`: KCL [shard-syncing](#lease-syncing) identifies new shards.
|
||||
1. `DISCOVERY`: KCL [shard-syncing](#shard-syncing) identifies new shards.
|
||||
Discovered shards may result from:
|
||||
* First time starting KCL with an empty lease table.
|
||||
* Stream mutations (i.e., split, merge) that create child shards.
|
||||
* Stream operations (i.e., split, merge) that create child shards.
|
||||
* In multi-stream mode, dynamic discovery of a new stream.
|
||||
1. `CREATION`: Leases are created 1:1 for each discovered shard.
|
||||
* Leases are only created if they are eligible for processing.
|
||||
|
|
@ -60,31 +61,45 @@ Discovered shards may result from:
|
|||
1. `DELETION`: Since there are no more records to process, KCL will delete the lease from the lease table.
|
||||
* Lease deletion will not occur until after its child lease(s) enter `PROCESSING`.
|
||||
* This tombstone helps KCL ensure durability and convergence for all discovered leases.
|
||||
* To dive deep, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote]
|
||||
* For more information, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote]
|
||||
* [Deletion is configurable][lease-cleanup-config] yet recommended to minimize I/O of lease table scans.
|
||||
|
||||
### Lease Syncing
|
||||
### Shard Syncing
|
||||
|
||||
Lease syncing is a complex responsibility owned by the "leader" host in a KCL application.
|
||||
Shard syncing is a complex responsibility owned by the leader host in a KCL application.
|
||||
By invoking the [ListShards API][list-shards], KCL will identify the shards for the configured stream(s).
|
||||
This process is scheduled at a [configurable interval][lease-auditor-config] so KCL can decide whether it should query for potentially-new shards.
|
||||
This process is scheduled at a [configurable interval][lease-auditor-config] to determine whether a shard sync should be executed to identify new shards.
|
||||
A shard sync is not guaranteed to identify new shards (e.g., KCL has already discovered all existing shards).
|
||||
|
||||

|
||||
The following diagram is an abridged sequence diagram of key classes that initialize the shard sync workflow:
|
||||

|
||||
|
||||
For convenience, links to code:
|
||||
The following diagram outlines the key classes involved in the shard sync workflow:
|
||||

|
||||
|
||||
For more information, here are the links to KCL code:
|
||||
* `Scheduler`: [implementation][scheduler]
|
||||
* `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl]
|
||||
* `PeriodicShardSyncManager`: [implementation][periodic-shard-sync-manager]
|
||||
* `ShardSyncTask`: [interface][consumer-task], [implementation][consumer-task-impl]
|
||||
* `ShardDetector`: [interface][shard-detector], [implementation][shard-detector-impl]
|
||||
* `HierarchicalShardSyncer`: [implementation][hierarchical-shard-syncer]
|
||||
* `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl]
|
||||
* `LeaseSynchronizer`: [implementation][non-empty-lease-table-synchronizer]
|
||||
* `HierarchicalShardSyncer`: [implementation][hierarchical-shard-syncer]
|
||||
* `ShardDetector`: [interface][shard-detector], [implementation][shard-detector-impl]
|
||||
|
||||
Lease creation is a deterministic process.
|
||||
This is illustrative of how KCL operates.
|
||||
Assume a stream has the following shard hierarchy:
|
||||
<pre>
|
||||
Shard structure (each level depicts a stream segment):
|
||||
|
|
@ -104,17 +119,17 @@ Assuming leases `(4, 5, 7)` already exist, the leases created for an initial pos
|
|||
|
||||
#### Avoiding a Shard-Sync
|
||||
|
||||
To reduce Kinesis API calls, KCL will attempt to avoid unnecessary shard syncs.
|
||||
To reduce Kinesis Data Streams API calls, KCL will attempt to avoid unnecessary shard syncs.
|
||||
For example, if the discovered shards cover the entire partition range then a shard-sync is unlikely to yield a material difference.
|
||||
To dive deeper, see
|
||||
For more information, see
|
||||
[PeriodicShardSyncManager#checkForShardSync(...)][periodic-shard-sync-manager-checkforshardsync])[^fixed-commit-footnote].
|
||||
|
||||
## Lease Balancing
|
||||
|
||||
KCL will, at a cadence configured by `leaseDuration` and `epsilonMillis`, attempt to "balance" leases across workers.
|
||||
Lease balancing is done to protect against stagnation if a worker stops updating the lease table (e.g., host failure).
|
||||
This operation is naive and only attempts to equally distribute leases across the available hosts;
|
||||
shards are not guaranteed to be equal in their `put` workloads, and balancing is blind to this I/O skew.
|
||||
KCL balances leases across workers at an interval configured by `leaseDuration` and `epsilonMillis`.
|
||||
Lease balancing is done to protect against interruptions in processing should a worker stop updating the lease table (e.g., host failure).
|
||||
This operation only accounts for lease assignments and does not factor in I/O load.
|
||||
For example, leases that are equally-distributed across KCL are not guaranteed to have equal I/O distribution.
|
||||
|
||||

|
||||
|
||||
For convenience, links to code:
|
||||
For more information, here are the links to KCL code:
|
||||
* `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl]
|
||||
* `LeaseTaker`: [interface][lease-taker], [implementation][lease-taker-impl]
|
||||
* `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl]
|
||||
|
|
@ -139,8 +154,8 @@ Customers should consider the following trade-offs when configuring the lease-ta
|
|||
|
||||
# Additional Reading
|
||||
|
||||
Informative articles that are recommended (in no particular order):
|
||||
* https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/
|
||||
Recommended reading:
|
||||
* [Processing Amazon DynamoDB Streams Using the Amazon Kinesis Client Library](https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/)
|
||||
|
||||
[^fixed-commit-footnote]: This link is a point-in-time reference to a specific commit to guarantee static line numbers.
|
||||
This code reference is not guaranteed to remain consistent with the `master` branch.
|
||||
|
|
@ -149,6 +164,7 @@ Informative articles that are recommended (in no particular order):
|
|||
[consumer-task-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
|
||||
[dynamodb]: https://aws.amazon.com/dynamodb/
|
||||
[hierarchical-shard-syncer]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
|
||||
[kcl-concepts]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-concepts
|
||||
[kcl-leasetable]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable
|
||||
[lease-auditor-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L204-L209
|
||||
[lease-cleanup-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L112-L128
|
||||
23
docs/plantuml/lease-shard-sync-initialization.puml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
@startuml
|
||||
'https://plantuml.com/sequence-diagram
|
||||
|
||||
autonumber
|
||||
|
||||
title KCL Shard Syncing Initialization (Abridged)
|
||||
|
||||
participant Scheduler as S
|
||||
participant LeaseCoordinator as LC
|
||||
participant PeriodShardSyncManager as PSS
|
||||
participant "Lease Table (DDB)" as DDB
|
||||
|
||||
alt on initialization
|
||||
S->S: create PeriodicShardSyncManager(\n ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...)
|
||||
S->LC: initialize()
|
||||
opt if lease table does not exist
|
||||
LC->DDB: create lease table
|
||||
end
|
||||
S->PSS: start()
|
||||
PSS->PSS: schedule self every\n leasesRecoveryAuditorExecutionFrequencyMillis
|
||||
end
|
||||
|
||||
@enduml
|
||||
|
|
@ -3,27 +3,15 @@
|
|||
|
||||
autonumber
|
||||
|
||||
title KCL Shard Syncing (Abridged)
|
||||
title KCL Shard Syncing Main Loop (Abridged)
|
||||
|
||||
participant Scheduler as S
|
||||
participant LeaseCoordinator as LC
|
||||
participant PeriodShardSyncManager as PSS
|
||||
participant ShardSyncTask as SST
|
||||
participant "Lease Table (DDB)" as DDB
|
||||
participant ShardDetector as SD
|
||||
participant HierarchicalShardSyncer as HSS
|
||||
participant LeaseRefresher as LR
|
||||
participant LeaseSynchronizer as LS
|
||||
participant HierarchicalShardSyncer as HSS
|
||||
participant ShardDetector as SD
|
||||
|
||||
alt on initialization
|
||||
S->S: create PeriodicShardSyncManager(\n ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...)
|
||||
S->LC: initialize()
|
||||
opt if lease table does not exist
|
||||
LC->DDB: create lease table
|
||||
end
|
||||
S->PSS: start()
|
||||
PSS->PSS: schedule self every\n leasesRecoveryAuditorExecutionFrequencyMillis
|
||||
end
|
||||
participant "Lease Table (DDB)" as DDB
|
||||
|
||||
loop every leasesRecoveryAuditorExecutionFrequencyMillis
|
||||
opt if worker is not leader
|
||||
|
|
@ -38,16 +26,17 @@ loop every leasesRecoveryAuditorExecutionFrequencyMillis
|
|||
PSS->SST: call()
|
||||
SST->HSS: checkAndCreateLeasesForNewShards(\n shardDetector, initialPosition, ...)
|
||||
opt if lease table is empty
|
||||
HSS->HSS: getShardListAtInitialPosition(shardDetector, initialPosition)
|
||||
HSS->HSS: getShardListAtInitialPosition(\n shardDetector, initialPosition)
|
||||
HSS->SD: listShardsWithFilter(initialPositionFilter)
|
||||
else lease table is non-empty
|
||||
HSS->HSS: getShardList(shardDetector)
|
||||
HSS->SD: listShards(...)
|
||||
note over SD
|
||||
ShardDetector invokes the Kinesis ListShards API.
|
||||
ShardDetector invokes the
|
||||
Kinesis Data Streams ListShards API.
|
||||
end note
|
||||
end
|
||||
HSS->HSS: checkAndCreateLeaseForNewShards(\n shardDetector, leaseRefresher, initialPosition, ...)
|
||||
HSS->HSS: checkAndCreateLeaseForNewShards(\n shardDetector, leaseRefresher,\n initialPosition, ...)
|
||||
HSS->LR: listLeases()
|
||||
LR->DDB: scan(:streamName=streamName)
|
||||
DDB->LR: leases from table
|
||||