This commit is contained in:
Caleb Stewart 2023-09-04 20:17:29 +00:00 committed by GitHub
commit 7e0fbbefbd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 178 additions and 0 deletions

View file

@ -0,0 +1,169 @@
package dogstatsd
import (
"sync/atomic"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/vmware/vmware-go-kcl-v2/logger"
)
const (
// Labels attached to metric submissions
streamNameLabel = "kinesis.stream"
shardIdLabel = "kinesis.shardId"
workerIdLabel = "kcl.workerId"
applicationNameLabel = "kcl.applicationName"
// Names for the metrics submitted below
recordsProcessedMetric = "kcl.records_processed"
bytesProcessedMetric = "kcl.bytes_processed"
millisBehindLatestMetric = "kcl.millis_behind_latest"
shardLeasesOwnedMetric = "kcl.shard_leases_owned"
getRecordsTimeMetric = "kcl.get_records_time"
processRecordsTimeMetric = "kcl.process_records_time"
)
// MonitoringService publishes KCL metrics to Datadog using the dogstatsd client
// package.
type MonitoringService struct {
Client statsd.ClientInterface // Client used to push metrics
SamplingRate float64 // Sampling rate for all metrics
Logger logger.Logger // Logger used for metric push errors
leaseCount int64 // Number of leases current held
tags []string // List of tags to send with every metric
}
// New creates a new dogstatsd monitoring service. The given sampling rate will
// be used for all submitted metrics. The logger is only used if submissions
// fail.
func New(client statsd.ClientInterface, samplingRate float64, logger logger.Logger) *MonitoringService {
return &MonitoringService{
Client: client,
SamplingRate: samplingRate,
Logger: logger,
leaseCount: 0,
tags: []string{},
}
}
func (s *MonitoringService) Init(appName, streamName, workerID string) (err error) {
s.tags = []string{
applicationNameLabel + ":" + appName,
streamNameLabel + ":" + streamName,
workerIdLabel + ":" + workerID,
}
return err
}
func (s *MonitoringService) Start() error {
return nil
}
func (s *MonitoringService) Shutdown() {}
// If the error is non-nil, log it. This should be inlined by the
// compiler so no overhead, and simplifies the metrics methods below,
// since the log entry is essentially always the same.
func (s *MonitoringService) logFailure(shard string, metric string, err error) {
if err != nil {
s.Logger.WithFields(logger.Fields{
"error": err,
"shardId": shard,
"metric": metric,
}).Errorf("failed to push metric")
}
}
// Add the tags for a specific metric to the global monitoring service tags
func (s *MonitoringService) buildTags(tags ...string) []string {
return append(tags, s.tags...)
}
func (s *MonitoringService) IncrRecordsProcessed(shard string, count int) {
err := s.Client.Count(
recordsProcessedMetric,
int64(count),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, recordsProcessedMetric, err)
}
func (s *MonitoringService) IncrBytesProcessed(shard string, count int64) {
err := s.Client.Count(
bytesProcessedMetric,
count,
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, recordsProcessedMetric, err)
}
func (s *MonitoringService) MillisBehindLatest(shard string, millis float64) {
err := s.Client.Gauge(
millisBehindLatestMetric,
millis,
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, millisBehindLatestMetric, err)
}
func (s *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
s.MillisBehindLatest(shard, 0)
}
func (s *MonitoringService) LeaseGained(shard string) {
leaseCount := atomic.AddInt64(&s.leaseCount, 1)
err := s.Client.Gauge(
shardLeasesOwnedMetric,
float64(leaseCount),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, shardLeasesOwnedMetric, err)
}
func (s *MonitoringService) LeaseLost(shard string) {
leaseCount := atomic.AddInt64(&s.leaseCount, -1)
err := s.Client.Gauge(
shardLeasesOwnedMetric,
float64(leaseCount),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, shardLeasesOwnedMetric, err)
}
func (s *MonitoringService) LeaseRenewed(shard string) {
leaseCount := atomic.LoadInt64(&s.leaseCount)
err := s.Client.Gauge(
shardLeasesOwnedMetric,
float64(leaseCount),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, shardLeasesOwnedMetric, err)
}
func (s *MonitoringService) RecordGetRecordsTime(shard string, time float64) {
err := s.Client.Count(
getRecordsTimeMetric,
int64(time),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, getRecordsTimeMetric, err)
}
func (s *MonitoringService) RecordProcessRecordsTime(shard string, time float64) {
err := s.Client.Count(
processRecordsTimeMetric,
int64(time),
s.buildTags(shardIdLabel+":"+shard),
s.SamplingRate,
)
s.logFailure(shard, processRecordsTimeMetric, err)
}

2
go.mod
View file

@ -23,6 +23,8 @@ require (
require ( require (
github.com/BurntSushi/toml v0.4.1 // indirect github.com/BurntSushi/toml v0.4.1 // indirect
github.com/DataDog/datadog-go/v5 v5.3.0 // indirect
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect

7
go.sum
View file

@ -35,6 +35,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go/v5 v5.3.0 h1:2q2qjFOb3RwAZNU+ez27ZVDwErJv5/VpbBPprz7Z+s8=
github.com/DataDog/datadog-go/v5 v5.3.0/go.mod h1:XRDJk1pTc00gm+ZDiBKsjh7oOOtJfYfglVCmFb8C2+Q=
github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -125,6 +129,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -237,6 +242,7 @@ github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -449,6 +455,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=