From a54ae17cd0ad5c262a6e3db3a696bd94d39fbba5 Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Mon, 4 Sep 2023 16:03:41 -0400 Subject: [PATCH] Added dogstatsd monitoring service Signed-off-by: Caleb Stewart --- clientlibrary/metrics/dogstatsd/dogstatsd.go | 169 +++++++++++++++++++ go.mod | 2 + go.sum | 7 + 3 files changed, 178 insertions(+) create mode 100644 clientlibrary/metrics/dogstatsd/dogstatsd.go diff --git a/clientlibrary/metrics/dogstatsd/dogstatsd.go b/clientlibrary/metrics/dogstatsd/dogstatsd.go new file mode 100644 index 0000000..d392cbb --- /dev/null +++ b/clientlibrary/metrics/dogstatsd/dogstatsd.go @@ -0,0 +1,169 @@ +package dogstatsd + +import ( + "sync/atomic" + + "github.com/DataDog/datadog-go/v5/statsd" + + "github.com/vmware/vmware-go-kcl-v2/logger" +) + +const ( + // Labels attached to metric submissions + streamNameLabel = "kinesis.stream" + shardIdLabel = "kinesis.shardId" + workerIdLabel = "kcl.workerId" + applicationNameLabel = "kcl.applicationName" + + // Names for the metrics submitted below + recordsProcessedMetric = "kcl.records_processed" + bytesProcessedMetric = "kcl.bytes_processed" + millisBehindLatestMetric = "kcl.millis_behind_latest" + shardLeasesOwnedMetric = "kcl.shard_leases_owned" + getRecordsTimeMetric = "kcl.get_records_time" + processRecordsTimeMetric = "kcl.process_records_time" +) + +// MonitoringService publishes KCL metrics to Datadog using the dogstatsd client +// package. +type MonitoringService struct { + Client statsd.ClientInterface // Client used to push metrics + SamplingRate float64 // Sampling rate for all metrics + Logger logger.Logger // Logger used for metric push errors + + leaseCount int64 // Number of leases current held + tags []string // List of tags to send with every metric +} + +// New creates a new dogstatsd monitoring service. The given sampling rate will +// be used for all submitted metrics. The logger is only used if submissions +// fail. +func New(client statsd.ClientInterface, samplingRate float64, logger logger.Logger) *MonitoringService { + return &MonitoringService{ + Client: client, + SamplingRate: samplingRate, + Logger: logger, + leaseCount: 0, + tags: []string{}, + } +} + +func (s *MonitoringService) Init(appName, streamName, workerID string) (err error) { + s.tags = []string{ + applicationNameLabel + ":" + appName, + streamNameLabel + ":" + streamName, + workerIdLabel + ":" + workerID, + } + return err +} + +func (s *MonitoringService) Start() error { + return nil +} + +func (s *MonitoringService) Shutdown() {} + +// If the error is non-nil, log it. This should be inlined by the +// compiler so no overhead, and simplifies the metrics methods below, +// since the log entry is essentially always the same. +func (s *MonitoringService) logFailure(shard string, metric string, err error) { + if err != nil { + s.Logger.WithFields(logger.Fields{ + "error": err, + "shardId": shard, + "metric": metric, + }).Errorf("failed to push metric") + } +} + +// Add the tags for a specific metric to the global monitoring service tags +func (s *MonitoringService) buildTags(tags ...string) []string { + return append(tags, s.tags...) +} + +func (s *MonitoringService) IncrRecordsProcessed(shard string, count int) { + err := s.Client.Count( + recordsProcessedMetric, + int64(count), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, recordsProcessedMetric, err) +} + +func (s *MonitoringService) IncrBytesProcessed(shard string, count int64) { + err := s.Client.Count( + bytesProcessedMetric, + count, + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, recordsProcessedMetric, err) +} + +func (s *MonitoringService) MillisBehindLatest(shard string, millis float64) { + err := s.Client.Gauge( + millisBehindLatestMetric, + millis, + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, millisBehindLatestMetric, err) +} + +func (s *MonitoringService) DeleteMetricMillisBehindLatest(shard string) { + s.MillisBehindLatest(shard, 0) +} + +func (s *MonitoringService) LeaseGained(shard string) { + leaseCount := atomic.AddInt64(&s.leaseCount, 1) + err := s.Client.Gauge( + shardLeasesOwnedMetric, + float64(leaseCount), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, shardLeasesOwnedMetric, err) +} + +func (s *MonitoringService) LeaseLost(shard string) { + leaseCount := atomic.AddInt64(&s.leaseCount, -1) + err := s.Client.Gauge( + shardLeasesOwnedMetric, + float64(leaseCount), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, shardLeasesOwnedMetric, err) +} + +func (s *MonitoringService) LeaseRenewed(shard string) { + leaseCount := atomic.LoadInt64(&s.leaseCount) + err := s.Client.Gauge( + shardLeasesOwnedMetric, + float64(leaseCount), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, shardLeasesOwnedMetric, err) +} + +func (s *MonitoringService) RecordGetRecordsTime(shard string, time float64) { + err := s.Client.Count( + getRecordsTimeMetric, + int64(time), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, getRecordsTimeMetric, err) +} + +func (s *MonitoringService) RecordProcessRecordsTime(shard string, time float64) { + err := s.Client.Count( + processRecordsTimeMetric, + int64(time), + s.buildTags(shardIdLabel+":"+shard), + s.SamplingRate, + ) + s.logFailure(shard, processRecordsTimeMetric, err) +} diff --git a/go.mod b/go.mod index 271895b..4a96653 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,8 @@ require ( require ( github.com/BurntSushi/toml v0.4.1 // indirect + github.com/DataDog/datadog-go/v5 v5.3.0 // indirect + github.com/Microsoft/go-winio v0.5.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect diff --git a/go.sum b/go.sum index 7ac1638..88e9f05 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/datadog-go/v5 v5.3.0 h1:2q2qjFOb3RwAZNU+ez27ZVDwErJv5/VpbBPprz7Z+s8= +github.com/DataDog/datadog-go/v5 v5.3.0/go.mod h1:XRDJk1pTc00gm+ZDiBKsjh7oOOtJfYfglVCmFb8C2+Q= +github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU= +github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -125,6 +129,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -237,6 +242,7 @@ github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -449,6 +455,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=