9.7 KiB
Leases and Lease Lifecycle
Leases are created in, and leveraged by, KCL to manage a stream's shards. This document should help provide insights into the lease lifecycle.
Nota bene (N.B.): while actual shard ids are formatted like shardId-000000000042, this document uses shardId[_-]42 for concision.
Leases
In KCL, a lease provides a temporal assignment between one Kinesis shard and an assigned worker. Leases are persistent for the duration of shard processing (detailed later). However, lease assignment is transient -- leases may be "stolen" by other workers in the same KCL application.
Lease Table
To persist metadata about lease state (e.g., last read checkpoint, current assigned worker), KCL creates a lease table in DynamoDB. Each KCL application will have its own distinct lease table that transcludes the application name. More information, including schema, is provided at KCL LeaseTable.
Lease Assignment
The "life" of a lease closely mirrors the duration that a shard is being processed.
Specifically, leases are unique to the shard and are not recycled for stream mutations (i.e., split, merge). Shards created by stream mutations will generate a new lease.
It should be noted that the number of tracked leases may exceed the number of shards. Per the diagram (above), this can occur when there are stream mutations propagating through KCL. For example, a 10-shard stream that is split on every shard may temporarily have up-to 30 leases: 10 original + 20 children.
N.B. Leases are uniquely identified by their leaseKey which looks vastly different than lease_X.
For details on the leaseKey format, please see KCL LeaseTable.
Lease Lifecycle
Leases follow a relatively simple, progressive state machine:
DISCOVERY -> CREATION -> PROCESSING -> SHARD_END -> DELETION
Excluding SHARD_END, these phases are illustrative of KCL logic and are not explicitly codified.
DISCOVERY: KCL shard-syncing identifies new shards. Discovered shards may result from:- First time starting KCL with an empty lease table.
- Stream mutations (i.e., split, merge) that create child shards.
- In multi-stream mode, dynamic discovery of a new stream.
CREATION: Leases are created 1:1 for each discovered shard, and initialized at the configured initial position.- Leases are only created if they are eligible for processing.
For example, child shards will not have leases created until its parent(s) have reached
SHARD_END.
- Leases are only created if they are eligible for processing.
For example, child shards will not have leases created until its parent(s) have reached
PROCESSING: Leases are processed, and continually updated with new checkpoints.- In general, leases spend the majority of their life in this state.
SHARD_END: The associated shard isSHARD_ENDand all records have been processed by KCL for the shard.DELETION: Since there are no more records to process, KCL will delete the lease from the lease table.- Deletion will only be triggered after all parents of a child shard have converged to
SHARD_END. Convergence is required to preserve ordering of records between parent-child relationships. - Deletion is configurable yet recommended to minimize I/O of lease table scans.
- Deletion will only be triggered after all parents of a child shard have converged to
Lease Syncing
Lease syncing is a complex responsibility owned by the "leader" host in a KCL application. By invoking the ListShards API, KCL will identify the shards for the configured stream(s). This process is scheduled at a configurable interval so KCL can self-identify new shards introduced via stream mutations.
For convenience, links to code:
Scheduler: implementationLeaseCoordinator: interface, implementationPeriodicShardSyncManager: implementationShardSyncTask: interface, implementationLeaseRefresher: interface, implementationLeaseSynchronizer: implementationHierarchicalShardSyner: implementationShardDetector: interface, implementation
Lease creation is a deterministic process. Assume a stream has the following shard hierarchy:
Shard structure (each level depicts a stream segment):
0 1 2 3 4 5 - shards till epoch 102
\ / \ / | |
6 7 4 5 - shards from epoch 103 - 205
\ / | / \
8 4 9 10 - shards from epoch 206+ (still open)
Then NonEmptyLeaseTableSynchronizer
would create leases dependent on the configured initial position.
Assuming leases (4, 5, 7) already exist, the leases created for an initial position would be:
LATESTcreates(6)to resolve the gap on-par with epochs 103-205 which is required to eventually reachLATESTTRIM_HORIZONcreates(0, 1)to resolve the gap starting from theTRIM_HORIZONAT_TIMESTAMP(epoch=200)creates(0, 1)to resolve the gap leading into epoch 200
Lease Balancing
KCL will, at a cadence configured by leaseDuration and epsilonMillis, attempt to "balance" leases across workers.
Lease balancing is done to protect against stagnation if a worker stops updating the lease table (e.g., host failure).
This operation is naive and only attempts to equally distribute leases across the available hosts;
shards are not guaranteed to be equal in their put workloads, and balancing is blind to this I/O skew.
For convenience, links to code:
LeaseCoordinator: interface, implementationLeaseTaker: interface, implementationLeaseRefresher: interface, implementation
Leases are stolen if-and-only-if there were zero expired leases and the looking-to-steal-worker desires more leases.
Stolen leases are randomly selected from whichever worker has the most leases.
The number of leases to steal on each loop is configured via maxLeasesToStealAtOneTime.
Customers should consider the following trade-offs when configuring the lease-taking cadence:
LeaseRefresherinvokes a DDBscanagainst the lease table which has a cost proportional to the number of leases.- Frequent balancing may cause high lease turn-over which incurs DDB
writecosts, and potentially redundant work for stolen leases. - High
maxLeasesToStealAtOneTimemay cause churn.- For example, worker
Bsteals multiple leases from workerAcreating a numerical imbalance. In the next loop, workerCmay steal leases from workerB.
- For example, worker
- Low
maxLeasesToStealAtOneTimemay increase the time to fully (re)assign leases after an impactful event (e.g., deployment, host failure).
Additional Reading
Informative articles that are recommended (in no particular order):


