diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index ee0686f..1a6f13b 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -103,6 +103,8 @@ func (sc *FanOutShardConsumer) getRecords() error { return err } refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond))) + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) case event, ok := <-shardSub.GetStream().Events(): if !ok { // need to resubscribe to shard diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 49d6a9a..252d39d 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -122,6 +122,8 @@ func (sc *PollingShardConsumer) getRecords() error { sc.shard.ID, sc.consumerID, err) return err } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) } getRecordsStartTime := time.Now()