feat: Sending renewed lease metric

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
Shiva Pentakota 2023-01-12 11:31:16 -08:00
parent 4afa8fec3e
commit e1425047a7
2 changed files with 4 additions and 0 deletions

View file

@ -103,6 +103,8 @@ func (sc *FanOutShardConsumer) getRecords() error {
return err
}
refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)))
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case event, ok := <-shardSub.GetStream().Events():
if !ok {
// need to resubscribe to shard

View file

@ -122,6 +122,8 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
getRecordsStartTime := time.Now()