From 599aa06ecd41585c3b0d704e05c90de716a0b1b4 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Fri, 20 Jan 2023 13:23:02 -0800 Subject: [PATCH] fix: add DeleteMetricMillisBehindLatest for error case Signed-off-by: Shiva Pentakota --- clientlibrary/metrics/interfaces.go | 2 ++ clientlibrary/metrics/prometheus/prometheus.go | 4 ++++ clientlibrary/worker/common-shard-consumer.go | 3 ++- clientlibrary/worker/fan-out-shard-consumer.go | 2 +- clientlibrary/worker/polling-shard-consumer.go | 2 +- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index e4b9f54..47ec490 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -35,6 +35,7 @@ type MonitoringService interface { IncrRecordsProcessed(shard string, count int) IncrBytesProcessed(shard string, count int64) MillisBehindLatest(shard string, milliSeconds float64) + DeleteMetricMillisBehindLatest(shard string) LeaseGained(shard string) LeaseLost(shard string) LeaseRenewed(shard string) @@ -53,6 +54,7 @@ func (NoopMonitoringService) Shutdown() {} func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {} func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {} func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {} +func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {} func (NoopMonitoringService) LeaseGained(_ string) {} func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {} diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index 609e34f..e489a78 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -147,6 +147,10 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64 p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds) } +func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) { + p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName}) +} + func (p *MonitoringService) LeaseGained(shard string) { p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc() } diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 32b91b1..253cecb 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -51,7 +51,7 @@ type commonShardConsumer struct { } // Cleanup the internal lease cache -func (sc *commonShardConsumer) releaseLease() { +func (sc *commonShardConsumer) releaseLease(shard string) { log := sc.kclConfig.Logger log.Infof("Release lease for shard %s", sc.shard.ID) sc.shard.SetLeaseOwner("") @@ -63,6 +63,7 @@ func (sc *commonShardConsumer) releaseLease() { } // reporting lease lose metrics + sc.mService.DeleteMetricMillisBehindLatest(shard) sc.mService.LeaseLost(sc.shard.ID) } diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index 1a6f13b..a60258c 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -46,7 +46,7 @@ type FanOutShardConsumer struct { // getRecords subscribes to a shard and reads events from it. // Precondition: it currently has the lease on the shard. func (sc *FanOutShardConsumer) getRecords() error { - defer sc.releaseLease() + defer sc.releaseLease(sc.shard.ID) log := sc.kclConfig.Logger diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 252d39d..a20fde0 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -79,7 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) { // getRecords continuously poll one shard for data record // Precondition: it currently has the lease on the shard. func (sc *PollingShardConsumer) getRecords() error { - defer sc.releaseLease() + defer sc.releaseLease(sc.shard.ID) log := sc.kclConfig.Logger