@startuml 'https://plantuml.com/sequence-diagram autonumber title KCL Shard Syncing (Abridged) participant Scheduler as S participant LeaseCoordinator as LC participant PeriodShardSyncManager as PSS participant ShardSyncTask as SST participant "Lease Table (DDB)" as DDB participant LeaseRefresher as LR participant LeaseSynchronizer as LS participant HierarchicalShardSyncer as HSS participant ShardDetector as SD alt on initialization S->S: create PeriodicShardSyncManager(\n ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...) S->LC: initialize() opt if lease table does not exist LC->DDB: create lease table end S->PSS: start() PSS->PSS: schedule self every\n leasesRecoveryAuditorExecutionFrequencyMillis end loop every leasesRecoveryAuditorExecutionFrequencyMillis opt if worker is not leader PSS->PSS: go back to sleep end PSS->PSS: runShardSync() opt if not required to sync shards PSS->PSS: go back to sleep end PSS->SST: call() SST->HSS: checkAndCreateLeasesForNewShards(\n shardDetector, initialPosition, ...) opt if lease table is empty HSS->HSS: getShardListAtInitialPosition(shardDetector, initialPosition) HSS->SD: listShardsWithFilter(initialPositionFilter) else lease table is non-empty HSS->HSS: getShardList(shardDetector) HSS->SD: listShards(...) note over SD ShardDetector invokes the Kinesis ListShards API. end note end HSS->HSS: checkAndCreateLeaseForNewShards(\n shardDetector, leaseRefresher, initialPosition, ...) HSS->LR: listLeases() LR->DDB: scan(:streamName=streamName) DDB->LR: leases from table LR->HSS: leases from table HSS->LS: determine leases to create LS->HSS: leases that are eligible for processing loop every lease to create HSS->LR: createLeaseIfNotExists(lease) LR->DDB: putItem(lease) end end @enduml