This commit is contained in:
Ryan Geary 2024-09-26 19:56:16 +00:00 committed by GitHub
commit cd42b85c6d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -304,69 +304,71 @@ func (w *Worker) eventLoop() {
}
}
// max number of lease has not been reached yet
if counter < w.kclConfig.MaxLeasesForWorker {
for _, shard := range w.shardStatus {
// already owner of the shard
if shard.GetLeaseOwner() == w.workerID {
for _, shard := range w.shardStatus {
// max number of lease has not been reached yet
if counter >= w.kclConfig.MaxLeasesForWorker {
break
}
// already owner of the shard
if shard.GetLeaseOwner() == w.workerID {
continue
}
err := w.checkpointer.FetchCheckpoint(shard)
if err != nil {
// checkpoint may not exist yet is not an error condition.
if err != chk.ErrSequenceIDNotFound {
log.Warnf("Couldn't fetch checkpoint: %+v", err)
// move on to next shard
continue
}
}
err := w.checkpointer.FetchCheckpoint(shard)
if err != nil {
// checkpoint may not exist yet is not an error condition.
if err != chk.ErrSequenceIDNotFound {
log.Warnf("Couldn't fetch checkpoint: %+v", err)
// move on to next shard
// The shard is closed and we have processed all records
if shard.GetCheckpoint() == chk.ShardEnd {
continue
}
var stealShard bool
if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
if shard.ClaimRequest == w.workerID {
stealShard = true
log.Debugf("Stealing shard: %s", shard.ID)
} else {
log.Debugf("Shard being stolen: %s", shard.ID)
continue
}
}
// The shard is closed and we have processed all records
if shard.GetCheckpoint() == chk.ShardEnd {
continue
}
var stealShard bool
if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
if shard.ClaimRequest == w.workerID {
stealShard = true
log.Debugf("Stealing shard: %s", shard.ID)
} else {
log.Debugf("Shard being stolen: %s", shard.ID)
continue
}
}
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
// cannot get lease on the shard
if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Errorf("Cannot get lease: %+v", err)
}
continue
}
if stealShard {
log.Debugf("Successfully stole shard: %+v", shard.ID)
w.shardStealInProgress = false
}
// log metrics on got lease
w.mService.LeaseGained(shard.ID)
w.waitGroup.Add(1)
go func(shard *par.ShardStatus) {
defer w.waitGroup.Done()
if err := w.newShardConsumer(shard).getRecords(); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}(shard)
// exit from for loop and not to grab more shard for now.
break
}
err = w.checkpointer.GetLease(shard, w.workerID)
if err != nil {
// cannot get lease on the shard
if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Errorf("Cannot get lease: %+v", err)
}
continue
}
if stealShard {
log.Debugf("Successfully stole shard: %+v", shard.ID)
w.shardStealInProgress = false
}
// log metrics on got lease
w.mService.LeaseGained(shard.ID)
w.waitGroup.Add(1)
go func(shard *par.ShardStatus) {
defer w.waitGroup.Done()
if err := w.newShardConsumer(shard).getRecords(); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}(shard)
// Increase the number of leases we have
counter++
}
if w.kclConfig.EnableLeaseStealing {