fix: add DeleteMetricMillisBehindLatest for error case

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
Shiva Pentakota 2023-01-20 13:23:02 -08:00
parent c5bc6c4ded
commit 599aa06ecd
5 changed files with 10 additions and 3 deletions

View file

@ -35,6 +35,7 @@ type MonitoringService interface {
IncrRecordsProcessed(shard string, count int) IncrRecordsProcessed(shard string, count int)
IncrBytesProcessed(shard string, count int64) IncrBytesProcessed(shard string, count int64)
MillisBehindLatest(shard string, milliSeconds float64) MillisBehindLatest(shard string, milliSeconds float64)
DeleteMetricMillisBehindLatest(shard string)
LeaseGained(shard string) LeaseGained(shard string)
LeaseLost(shard string) LeaseLost(shard string)
LeaseRenewed(shard string) LeaseRenewed(shard string)
@ -53,6 +54,7 @@ func (NoopMonitoringService) Shutdown() {}
func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {} func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {}
func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {} func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {}
func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {} func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {}
func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {}
func (NoopMonitoringService) LeaseGained(_ string) {} func (NoopMonitoringService) LeaseGained(_ string) {}
func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseLost(_ string) {}
func (NoopMonitoringService) LeaseRenewed(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {}

View file

@ -147,6 +147,10 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64
p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds) p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds)
} }
func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName})
}
func (p *MonitoringService) LeaseGained(shard string) { func (p *MonitoringService) LeaseGained(shard string) {
p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc() p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc()
} }

View file

@ -51,7 +51,7 @@ type commonShardConsumer struct {
} }
// Cleanup the internal lease cache // Cleanup the internal lease cache
func (sc *commonShardConsumer) releaseLease() { func (sc *commonShardConsumer) releaseLease(shard string) {
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
log.Infof("Release lease for shard %s", sc.shard.ID) log.Infof("Release lease for shard %s", sc.shard.ID)
sc.shard.SetLeaseOwner("") sc.shard.SetLeaseOwner("")
@ -63,6 +63,7 @@ func (sc *commonShardConsumer) releaseLease() {
} }
// reporting lease lose metrics // reporting lease lose metrics
sc.mService.DeleteMetricMillisBehindLatest(shard)
sc.mService.LeaseLost(sc.shard.ID) sc.mService.LeaseLost(sc.shard.ID)
} }

View file

@ -46,7 +46,7 @@ type FanOutShardConsumer struct {
// getRecords subscribes to a shard and reads events from it. // getRecords subscribes to a shard and reads events from it.
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *FanOutShardConsumer) getRecords() error { func (sc *FanOutShardConsumer) getRecords() error {
defer sc.releaseLease() defer sc.releaseLease(sc.shard.ID)
log := sc.kclConfig.Logger log := sc.kclConfig.Logger

View file

@ -79,7 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record // getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error { func (sc *PollingShardConsumer) getRecords() error {
defer sc.releaseLease() defer sc.releaseLease(sc.shard.ID)
log := sc.kclConfig.Logger log := sc.kclConfig.Logger