steal multiple shards per eventLoop

Signed-off-by: Ryan Geary rgeary@lyft.com

Signed-off-by: Ryan Geary <rgeary@lyft.com>
This commit is contained in:
Ryan Geary 2024-09-26 15:31:02 -04:00
parent f6e79f1a2d
commit 66fdfbb817

View file

@ -304,69 +304,71 @@ func (w *Worker) eventLoop() {
} }
} }
// max number of lease has not been reached yet for _, shard := range w.shardStatus {
if counter < w.kclConfig.MaxLeasesForWorker { // max number of lease has not been reached yet
for _, shard := range w.shardStatus { if counter >= w.kclConfig.MaxLeasesForWorker {
// already owner of the shard break
if shard.GetLeaseOwner() == w.workerID { }
// already owner of the shard
if shard.GetLeaseOwner() == w.workerID {
continue
}
err := w.checkpointer.FetchCheckpoint(shard)
if err != nil {
// checkpoint may not exist yet is not an error condition.
if err != chk.ErrSequenceIDNotFound {
log.Warnf("Couldn't fetch checkpoint: %+v", err)
// move on to next shard
continue continue
} }
}
err := w.checkpointer.FetchCheckpoint(shard) // The shard is closed and we have processed all records
if err != nil { if shard.GetCheckpoint() == chk.ShardEnd {
// checkpoint may not exist yet is not an error condition. continue
if err != chk.ErrSequenceIDNotFound { }
log.Warnf("Couldn't fetch checkpoint: %+v", err)
// move on to next shard var stealShard bool
if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
if shard.ClaimRequest == w.workerID {
stealShard = true
log.Debugf("Stealing shard: %s", shard.ID)
} else {
log.Debugf("Shard being stolen: %s", shard.ID)
continue continue
} }
} }
// The shard is closed and we have processed all records
if shard.GetCheckpoint() == chk.ShardEnd {
continue
}
var stealShard bool
if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
if shard.ClaimRequest == w.workerID {
stealShard = true
log.Debugf("Stealing shard: %s", shard.ID)
} else {
log.Debugf("Shard being stolen: %s", shard.ID)
continue
}
}
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
// cannot get lease on the shard
if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Errorf("Cannot get lease: %+v", err)
}
continue
}
if stealShard {
log.Debugf("Successfully stole shard: %+v", shard.ID)
w.shardStealInProgress = false
}
// log metrics on got lease
w.mService.LeaseGained(shard.ID)
w.waitGroup.Add(1)
go func(shard *par.ShardStatus) {
defer w.waitGroup.Done()
if err := w.newShardConsumer(shard).getRecords(); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}(shard)
// exit from for loop and not to grab more shard for now.
break
} }
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
// cannot get lease on the shard
if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Errorf("Cannot get lease: %+v", err)
}
continue
}
if stealShard {
log.Debugf("Successfully stole shard: %+v", shard.ID)
w.shardStealInProgress = false
}
// log metrics on got lease
w.mService.LeaseGained(shard.ID)
w.waitGroup.Add(1)
go func(shard *par.ShardStatus) {
defer w.waitGroup.Done()
if err := w.newShardConsumer(shard).getRecords(); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}(shard)
// Increase the number of leases we have
counter++
} }
if w.kclConfig.EnableLeaseStealing { if w.kclConfig.EnableLeaseStealing {