From 66fdfbb817ff652d4fe428e68377eefc03c533c2 Mon Sep 17 00:00:00 2001 From: Ryan Geary Date: Thu, 26 Sep 2024 15:31:02 -0400 Subject: [PATCH] steal multiple shards per eventLoop Signed-off-by: Ryan Geary rgeary@lyft.com Signed-off-by: Ryan Geary --- clientlibrary/worker/worker.go | 114 +++++++++++++++++---------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 6843f01..447f5a8 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -304,69 +304,71 @@ func (w *Worker) eventLoop() { } } - // max number of lease has not been reached yet - if counter < w.kclConfig.MaxLeasesForWorker { - for _, shard := range w.shardStatus { - // already owner of the shard - if shard.GetLeaseOwner() == w.workerID { + for _, shard := range w.shardStatus { + // max number of lease has not been reached yet + if counter >= w.kclConfig.MaxLeasesForWorker { + break + } + // already owner of the shard + if shard.GetLeaseOwner() == w.workerID { + continue + } + + err := w.checkpointer.FetchCheckpoint(shard) + if err != nil { + // checkpoint may not exist yet is not an error condition. + if err != chk.ErrSequenceIDNotFound { + log.Warnf("Couldn't fetch checkpoint: %+v", err) + // move on to next shard continue } + } - err := w.checkpointer.FetchCheckpoint(shard) - if err != nil { - // checkpoint may not exist yet is not an error condition. - if err != chk.ErrSequenceIDNotFound { - log.Warnf("Couldn't fetch checkpoint: %+v", err) - // move on to next shard + // The shard is closed and we have processed all records + if shard.GetCheckpoint() == chk.ShardEnd { + continue + } + + var stealShard bool + if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" { + upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond) + if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) { + if shard.ClaimRequest == w.workerID { + stealShard = true + log.Debugf("Stealing shard: %s", shard.ID) + } else { + log.Debugf("Shard being stolen: %s", shard.ID) continue } } - - // The shard is closed and we have processed all records - if shard.GetCheckpoint() == chk.ShardEnd { - continue - } - - var stealShard bool - if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" { - upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond) - if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) { - if shard.ClaimRequest == w.workerID { - stealShard = true - log.Debugf("Stealing shard: %s", shard.ID) - } else { - log.Debugf("Shard being stolen: %s", shard.ID) - continue - } - } - } - - err = w.checkpointer.GetLease(shard, w.workerID) - if err != nil { - // cannot get lease on the shard - if !errors.As(err, &chk.ErrLeaseNotAcquired{}) { - log.Errorf("Cannot get lease: %+v", err) - } - continue - } - - if stealShard { - log.Debugf("Successfully stole shard: %+v", shard.ID) - w.shardStealInProgress = false - } - - // log metrics on got lease - w.mService.LeaseGained(shard.ID) - w.waitGroup.Add(1) - go func(shard *par.ShardStatus) { - defer w.waitGroup.Done() - if err := w.newShardConsumer(shard).getRecords(); err != nil { - log.Errorf("Error in getRecords: %+v", err) - } - }(shard) - // exit from for loop and not to grab more shard for now. - break } + + err = w.checkpointer.GetLease(shard, w.workerID) + if err != nil { + // cannot get lease on the shard + if !errors.As(err, &chk.ErrLeaseNotAcquired{}) { + log.Errorf("Cannot get lease: %+v", err) + } + continue + } + + if stealShard { + log.Debugf("Successfully stole shard: %+v", shard.ID) + w.shardStealInProgress = false + } + + // log metrics on got lease + w.mService.LeaseGained(shard.ID) + w.waitGroup.Add(1) + go func(shard *par.ShardStatus) { + defer w.waitGroup.Done() + if err := w.newShardConsumer(shard).getRecords(); err != nil { + log.Errorf("Error in getRecords: %+v", err) + } + }(shard) + + // Increase the number of leases we have + counter++ } if w.kclConfig.EnableLeaseStealing {