From e1425047a74793b3c242673fa0544f25df777355 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Thu, 12 Jan 2023 11:31:16 -0800 Subject: [PATCH] feat: Sending renewed lease metric Signed-off-by: Shiva Pentakota --- clientlibrary/worker/fan-out-shard-consumer.go | 2 ++ clientlibrary/worker/polling-shard-consumer.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index ee0686f..1a6f13b 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -103,6 +103,8 @@ func (sc *FanOutShardConsumer) getRecords() error { return err } refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond))) + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) case event, ok := <-shardSub.GetStream().Events(): if !ok { // need to resubscribe to shard diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 49d6a9a..252d39d 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -122,6 +122,8 @@ func (sc *PollingShardConsumer) getRecords() error { sc.shard.ID, sc.consumerID, err) return err } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) } getRecordsStartTime := time.Now()