diff --git a/docs/images/lease-shard-sync-initialization.png b/docs/images/lease-shard-sync-initialization.png new file mode 100644 index 00000000..ba1ed9ec Binary files /dev/null and b/docs/images/lease-shard-sync-initialization.png differ diff --git a/docs/images/lease-shard-sync-loop.png b/docs/images/lease-shard-sync-loop.png new file mode 100644 index 00000000..e912b652 Binary files /dev/null and b/docs/images/lease-shard-sync-loop.png differ diff --git a/docs/images/lease-shard-sync.png b/docs/images/lease-shard-sync.png deleted file mode 100644 index af30496e..00000000 Binary files a/docs/images/lease-shard-sync.png and /dev/null differ diff --git a/docs/images/lease-taking.png b/docs/images/lease-taking.png index 6f62fc5d..9ff3451c 100644 Binary files a/docs/images/lease-taking.png and b/docs/images/lease-taking.png differ diff --git a/docs/images/leases-and-mutations.png b/docs/images/leases-and-operations.png similarity index 97% rename from docs/images/leases-and-mutations.png rename to docs/images/leases-and-operations.png index b5e484ef..178202ab 100644 Binary files a/docs/images/leases-and-mutations.png and b/docs/images/leases-and-operations.png differ diff --git a/docs/leases-and-lease-lifecycle.md b/docs/lease-lifecycle.md similarity index 68% rename from docs/leases-and-lease-lifecycle.md rename to docs/lease-lifecycle.md index 8424e75f..6ea259c3 100644 --- a/docs/leases-and-lease-lifecycle.md +++ b/docs/lease-lifecycle.md @@ -1,40 +1,41 @@ -# Leases and Lease Lifecycle +# Lease Lifecycle -Leases are created in, and leveraged by, KCL to manage a stream's shards. -This document should help provide insights into the lease lifecycle. +A lease is data that defines the binding between a worker and a shard. +Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers. +At any given time, each shard of data records is bound to a particular worker by a lease identified by the `leaseKey` variable. -**Nota bene (N.B.):** while actual shard ids are formatted like `shardId-000000000042`, this document uses `shardId[_-]42` for concision. +This document describes the lease lifecycle. + +**Note:** shard ids are simplified from `shardId-000000000042` to read as `shardId[_-]42` for simplicity. ## Leases -In KCL, a lease provides a temporal assignment between one Kinesis shard and an assigned worker. +In KCL, a lease provides a temporal assignment between one shard and an assigned worker. Leases are persistent for the duration of shard processing (detailed later). -However, the worker that is processing a lease may change since leases may be "stolen" by other workers in the same KCL application. +However, the worker that is processing a lease may change since [leases may be "stolen"](#lease-balancing) by other workers in the same KCL application. ## Lease Table -To persist metadata about lease state (e.g., last read checkpoint, current assigned worker), KCL creates a lease table in [DynamoDB][dynamodb]. +To persist metadata about lease state (e.g., [last read checkpoint, current assigned worker][kcl-concepts]), KCL creates a lease table in [DynamoDB][dynamodb]. Each KCL application will have its own distinct lease table that includes the application name. More information, including schema, is provided at [KCL LeaseTable][kcl-leasetable]. ## Lease Assignment -The "life" of a lease closely mirrors the duration that a shard is being processed. +Leases are unique to the shard and are not recycled for stream operations (i.e., split, merge). +A shard created by stream operations will generate a new lease. ![Activity diagram of KCL shard-to-lease assignments. shard-0 (lease-0) is unmodified. shard-1 (lease-1) is split into shard-4 (lease-4) and shard-5 (lease-5). shard-2 (lease-2) and shard-3 (lease-3) are merged into shard-6 (lease-6). -](images/leases-and-mutations.png) - -Specifically, leases are unique to the shard and are not recycled for stream mutations (i.e., split, merge). -Shards created by stream mutations will generate a new lease. +](images/leases-and-operations.png) It should be noted that the number of tracked leases may exceed the number of shards. -Per the diagram (above), this can occur when there are stream mutations propagating through KCL. +Per the diagram (above), this can occur when there are stream operations propagating through KCL. For example, a 10-shard stream that is split on every shard may temporarily have up-to 30 leases: 10 original + 20 children. -N.B. Leases are uniquely identified by their `leaseKey` which looks vastly different than `lease_X`. +**Note:** Leases are uniquely identified by their `leaseKey` which looks vastly different than `lease_X`. For details on the `leaseKey` format, please see [KCL LeaseTable][kcl-leasetable]. ## Lease Lifecycle @@ -44,10 +45,10 @@ Leases follow a relatively simple, progressive state machine: Excluding `SHARD_END`, these phases are illustrative of KCL logic and are not explicitly codified. -1. `DISCOVERY`: KCL [shard-syncing](#lease-syncing) identifies new shards. +1. `DISCOVERY`: KCL [shard-syncing](#shard-syncing) identifies new shards. Discovered shards may result from: * First time starting KCL with an empty lease table. - * Stream mutations (i.e., split, merge) that create child shards. + * Stream operations (i.e., split, merge) that create child shards. * In multi-stream mode, dynamic discovery of a new stream. 1. `CREATION`: Leases are created 1:1 for each discovered shard. * Leases are only created if they are eligible for processing. @@ -60,31 +61,45 @@ Discovered shards may result from: 1. `DELETION`: Since there are no more records to process, KCL will delete the lease from the lease table. * Lease deletion will not occur until after its child lease(s) enter `PROCESSING`. * This tombstone helps KCL ensure durability and convergence for all discovered leases. - * To dive deep, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote] + * For more information, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote] * [Deletion is configurable][lease-cleanup-config] yet recommended to minimize I/O of lease table scans. -### Lease Syncing +### Shard Syncing -Lease syncing is a complex responsibility owned by the "leader" host in a KCL application. +Shard syncing is a complex responsibility owned by the leader host in a KCL application. By invoking the [ListShards API][list-shards], KCL will identify the shards for the configured stream(s). -This process is scheduled at a [configurable interval][lease-auditor-config] so KCL can decide whether it should query for potentially-new shards. +This process is scheduled at a [configurable interval][lease-auditor-config] to determine whether a shard sync should be executed to identify new shards. +A shard sync is not guaranteed to identify new shards (e.g., KCL has already discovered all existing shards). -![Abridged sequence diagram of the Shard Sync process. -Listed participants are the Scheduler, LeaseCoordinator, PeriodicShardSyncManager, ShardSyncTask, -Lease Table (DDB), LeaseRefresher, LeaseSynchronizer, HierarchicalShardSyncer, and ShardDetector. -](images/lease-shard-sync.png) +The following diagram is an abridged sequence diagram of key classes that initialize the shard sync workflow: +![Abridged sequence diagram of the Shard Sync initialization process. +Listed participants are the Scheduler, LeaseCoordinator, PeriodicShardSyncManager, and Lease Table (DDB). +Scheduler initializes the LeaseCoordinator which, in turn, creates the lease table if it does not exist. +Finally, Scheduler starts the PeriodicShardSyncManager which schedules itself to execute every leasesRecoveryAuditorExecutionFrequencyMillis. +](images/lease-shard-sync-initialization.png) -For convenience, links to code: +The following diagram outlines the key classes involved in the shard sync workflow: +![Abridged sequence diagram of the Shard Sync main processing loop. +Listed participants are the PeriodicShardSyncManager, ShardSyncTask, ShardDetector, +HierarchicalShardSyncer, LeaseRefresher, LeaseSynchronizer, and Lease Table (DDB). +On each iteration, PeriodicShardSyncManger determines whether it's the leader and a shard-sync is required before proceeding. +PeriodicShardSyncManager calls ShardSyncTask which calls HierarchicalShardSyncer which acquires the shard lists from ShardDetector. +HierarchicalShardSyncer then invokes LeaseRefresher to scan the DDB lease table, and uses those returned leases to identify shards which do not have leases. +Finally, HierarchicalShardSyncer uses LeaseRefresher to create any new leases in DDB. +](images/lease-shard-sync-loop.png) + +For more information, here are the links to KCL code: * `Scheduler`: [implementation][scheduler] * `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl] * `PeriodicShardSyncManager`: [implementation][periodic-shard-sync-manager] * `ShardSyncTask`: [interface][consumer-task], [implementation][consumer-task-impl] +* `ShardDetector`: [interface][shard-detector], [implementation][shard-detector-impl] +* `HierarchicalShardSyncer`: [implementation][hierarchical-shard-syncer] * `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl] * `LeaseSynchronizer`: [implementation][non-empty-lease-table-synchronizer] -* `HierarchicalShardSyncer`: [implementation][hierarchical-shard-syncer] -* `ShardDetector`: [interface][shard-detector], [implementation][shard-detector-impl] Lease creation is a deterministic process. +This is illustrative of how KCL operates. Assume a stream has the following shard hierarchy:
 Shard structure (each level depicts a stream segment):
@@ -104,17 +119,17 @@ Assuming leases `(4, 5, 7)` already exist, the leases created for an initial pos
 
 #### Avoiding a Shard-Sync
 
-To reduce Kinesis API calls, KCL will attempt to avoid unnecessary shard syncs.
+To reduce Kinesis Data Streams API calls, KCL will attempt to avoid unnecessary shard syncs.
 For example, if the discovered shards cover the entire partition range then a shard-sync is unlikely to yield a material difference.
-To dive deeper, see
+For more information, see
 [PeriodicShardSyncManager#checkForShardSync(...)][periodic-shard-sync-manager-checkforshardsync])[^fixed-commit-footnote].
 
 ## Lease Balancing
 
-KCL will, at a cadence configured by `leaseDuration` and `epsilonMillis`, attempt to "balance" leases across workers.
-Lease balancing is done to protect against stagnation if a worker stops updating the lease table (e.g., host failure).
-This operation is naive and only attempts to equally distribute leases across the available hosts;
-shards are not guaranteed to be equal in their `put` workloads, and balancing is blind to this I/O skew.
+KCL balances leases across workers at an interval configured by `leaseDuration` and `epsilonMillis`.
+Lease balancing is done to protect against interruptions in processing should a worker stop updating the lease table (e.g., host failure).
+This operation only accounts for lease assignments and does not factor in I/O load.
+For example, leases that are equally-distributed across KCL are not guaranteed to have equal I/O distribution.
 
 ![Sequence diagram of the KCL Lease Taking workflow.
 Participants include the LeaseCoordinator, LeaseTaker, LeaseRefresher, and Lease Table (DDB).
@@ -123,7 +138,7 @@ LeaseTaker identifies which leases are eligible for taking/stealing.
 All taken/stolen leases are passed through LeaseRefresher to update the lease table.
 ](images/lease-taking.png)
 
-For convenience, links to code:
+For more information, here are the links to KCL code:
 * `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl]
 * `LeaseTaker`: [interface][lease-taker], [implementation][lease-taker-impl]
 * `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl]
@@ -139,8 +154,8 @@ Customers should consider the following trade-offs when configuring the lease-ta
 
 # Additional Reading
 
-Informative articles that are recommended (in no particular order):
-* https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/
+Recommended reading:
+* [Processing Amazon DynamoDB Streams Using the Amazon Kinesis Client Library](https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/)
 
 [^fixed-commit-footnote]: This link is a point-in-time reference to a specific commit to guarantee static line numbers.
     This code reference is not guaranteed to remain consistent with the `master` branch.
@@ -149,6 +164,7 @@ Informative articles that are recommended (in no particular order):
 [consumer-task-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java
 [dynamodb]: https://aws.amazon.com/dynamodb/
 [hierarchical-shard-syncer]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
+[kcl-concepts]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-concepts
 [kcl-leasetable]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable
 [lease-auditor-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L204-L209
 [lease-cleanup-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L112-L128
diff --git a/docs/plantuml/lease-shard-sync-initialization.puml b/docs/plantuml/lease-shard-sync-initialization.puml
new file mode 100644
index 00000000..f2b08812
--- /dev/null
+++ b/docs/plantuml/lease-shard-sync-initialization.puml
@@ -0,0 +1,23 @@
+@startuml
+'https://plantuml.com/sequence-diagram
+
+autonumber
+
+title KCL Shard Syncing Initialization (Abridged)
+
+participant Scheduler as S
+participant LeaseCoordinator as LC
+participant PeriodShardSyncManager as PSS
+participant "Lease Table (DDB)" as DDB
+
+alt on initialization
+  S->S: create PeriodicShardSyncManager(\n  ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...)
+  S->LC: initialize()
+  opt if lease table does not exist
+    LC->DDB: create lease table
+  end
+  S->PSS: start()
+  PSS->PSS: schedule self every\n  leasesRecoveryAuditorExecutionFrequencyMillis
+end
+
+@enduml
diff --git a/docs/plantuml/lease-shard-sync.puml b/docs/plantuml/lease-shard-sync-loop.puml
similarity index 67%
rename from docs/plantuml/lease-shard-sync.puml
rename to docs/plantuml/lease-shard-sync-loop.puml
index 3954301d..60e0933a 100644
--- a/docs/plantuml/lease-shard-sync.puml
+++ b/docs/plantuml/lease-shard-sync-loop.puml
@@ -3,27 +3,15 @@
 
 autonumber
 
-title KCL Shard Syncing (Abridged)
+title KCL Shard Syncing Main Loop (Abridged)
 
-participant Scheduler as S
-participant LeaseCoordinator as LC
 participant PeriodShardSyncManager as PSS
 participant ShardSyncTask as SST
-participant "Lease Table (DDB)" as DDB
+participant ShardDetector as SD
+participant HierarchicalShardSyncer as HSS
 participant LeaseRefresher as LR
 participant LeaseSynchronizer as LS
-participant HierarchicalShardSyncer as HSS
-participant ShardDetector as SD
-
-alt on initialization
-  S->S: create PeriodicShardSyncManager(\n  ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...)
-  S->LC: initialize()
-  opt if lease table does not exist
-    LC->DDB: create lease table
-  end
-  S->PSS: start()
-  PSS->PSS: schedule self every\n  leasesRecoveryAuditorExecutionFrequencyMillis
-end
+participant "Lease Table (DDB)" as DDB
 
 loop every leasesRecoveryAuditorExecutionFrequencyMillis
   opt if worker is not leader
@@ -38,16 +26,17 @@ loop every leasesRecoveryAuditorExecutionFrequencyMillis
   PSS->SST: call()
   SST->HSS: checkAndCreateLeasesForNewShards(\n  shardDetector, initialPosition, ...)
   opt if lease table is empty
-    HSS->HSS: getShardListAtInitialPosition(shardDetector, initialPosition)
+    HSS->HSS: getShardListAtInitialPosition(\n  shardDetector, initialPosition)
     HSS->SD: listShardsWithFilter(initialPositionFilter)
   else lease table is non-empty
     HSS->HSS: getShardList(shardDetector)
     HSS->SD: listShards(...)
     note over SD
-      ShardDetector invokes the Kinesis ListShards API.
+      ShardDetector invokes the
+      Kinesis Data Streams ListShards API.
     end note
   end
-  HSS->HSS: checkAndCreateLeaseForNewShards(\n  shardDetector, leaseRefresher, initialPosition, ...)
+  HSS->HSS: checkAndCreateLeaseForNewShards(\n  shardDetector, leaseRefresher,\n  initialPosition, ...)
   HSS->LR: listLeases()
   LR->DDB: scan(:streamName=streamName)
   DDB->LR: leases from table
diff --git a/docs/plantuml/leases-and-mutations.puml b/docs/plantuml/leases-and-operations.puml
similarity index 100%
rename from docs/plantuml/leases-and-mutations.puml
rename to docs/plantuml/leases-and-operations.puml