diff --git a/src/clientlibrary/metrics/cloudwatch.go b/src/clientlibrary/metrics/cloudwatch.go index f5a76d6..385438b 100644 --- a/src/clientlibrary/metrics/cloudwatch.go +++ b/src/clientlibrary/metrics/cloudwatch.go @@ -15,8 +15,8 @@ type CloudWatchMonitoringService struct { Namespace string KinesisStream string WorkerID string - // What granularity we should send metrics to CW at. Note setting this to 1 will cost quite a bit of money - // At the time of writing (March 2018) about US$200 per month + Region string + // how frequently to send data to cloudwatch ResolutionSec int svc cloudwatchiface.CloudWatchAPI shardMetrics map[string]*cloudWatchMetrics @@ -34,75 +34,82 @@ type cloudWatchMetrics struct { } func (cw *CloudWatchMonitoringService) Init() error { + // default to 1 min resolution if cw.ResolutionSec == 0 { cw.ResolutionSec = 60 } - session, err := session.NewSessionWithOptions( - session.Options{ - SharedConfigState: session.SharedConfigEnable, - }, - ) - if err != nil { - return err - } - - cw.svc = cloudwatch.New(session) + s := session.New(&aws.Config{Region: aws.String(cw.Region)}) + cw.svc = cloudwatch.New(s) cw.shardMetrics = make(map[string]*cloudWatchMetrics) + return nil } +// Start daemon to flush metrics periodically func (cw *CloudWatchMonitoringService) flushDaemon() { previousFlushTime := time.Now() resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second for { time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime)) - err := cw.flush() + err := cw.Flush() if err != nil { - log.Errorln("Error sending metrics to CloudWatch", err) + log.Errorf("Error sending metrics to CloudWatch. %+v", err) } previousFlushTime = time.Now() } } -func (cw *CloudWatchMonitoringService) flush() error { +func (cw *CloudWatchMonitoringService) Flush() error { + // publish per shard metrics for shard, metric := range cw.shardMetrics { metric.Lock() defaultDimensions := []*cloudwatch.Dimension{ - &cloudwatch.Dimension{ - Name: aws.String("shard"), + { + Name: aws.String("Shard"), Value: &shard, }, - &cloudwatch.Dimension{ + { Name: aws.String("KinesisStreamName"), Value: &cw.KinesisStream, }, } - leaseDimensions := make([]*cloudwatch.Dimension, len(defaultDimensions)) - copy(defaultDimensions, leaseDimensions) - leaseDimensions = append(leaseDimensions, &cloudwatch.Dimension{ - Name: aws.String("WorkerID"), - Value: &cw.WorkerID, - }) + + leaseDimensions := []*cloudwatch.Dimension{ + { + Name: aws.String("Shard"), + Value: &shard, + }, + { + Name: aws.String("KinesisStreamName"), + Value: &cw.KinesisStream, + }, + { + Name: aws.String("WorkerID"), + Value: &cw.WorkerID, + }, + } metricTimestamp := time.Now() + + // Publish metrics data to cloud watch _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ Namespace: aws.String(cw.Namespace), MetricData: []*cloudwatch.MetricDatum{ - &cloudwatch.MetricDatum{ + { Dimensions: defaultDimensions, MetricName: aws.String("RecordsProcessed"), Unit: aws.String("Count"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.processedRecords)), }, - &cloudwatch.MetricDatum{ + { Dimensions: defaultDimensions, MetricName: aws.String("DataBytesProcessed"), - Unit: aws.String("Byte"), + Unit: aws.String("Bytes"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.processedBytes)), }, - &cloudwatch.MetricDatum{ + { Dimensions: defaultDimensions, MetricName: aws.String("MillisBehindLatest"), Unit: aws.String("Milliseconds"), @@ -114,7 +121,7 @@ func (cw *CloudWatchMonitoringService) flush() error { Minimum: minFloat64(metric.behindLatestMillis), }, }, - &cloudwatch.MetricDatum{ + { Dimensions: defaultDimensions, MetricName: aws.String("KinesisDataFetcher.getRecords.Time"), Unit: aws.String("Milliseconds"), @@ -126,7 +133,7 @@ func (cw *CloudWatchMonitoringService) flush() error { Minimum: minFloat64(metric.getRecordsTime), }, }, - &cloudwatch.MetricDatum{ + { Dimensions: defaultDimensions, MetricName: aws.String("RecordProcessor.processRecords.Time"), Unit: aws.String("Milliseconds"), @@ -138,14 +145,14 @@ func (cw *CloudWatchMonitoringService) flush() error { Minimum: minFloat64(metric.processRecordsTime), }, }, - &cloudwatch.MetricDatum{ + { Dimensions: leaseDimensions, MetricName: aws.String("RenewLease.Success"), Unit: aws.String("Count"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.leaseRenewals)), }, - &cloudwatch.MetricDatum{ + { Dimensions: leaseDimensions, MetricName: aws.String("CurrentLeases"), Unit: aws.String("Count"), @@ -161,7 +168,10 @@ func (cw *CloudWatchMonitoringService) flush() error { metric.leaseRenewals = 0 metric.getRecordsTime = []float64{} metric.processRecordsTime = []float64{} + } else { + log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) } + metric.Unlock() return err } diff --git a/src/clientlibrary/metrics/interfaces.go b/src/clientlibrary/metrics/interfaces.go index 141e644..57dfc11 100644 --- a/src/clientlibrary/metrics/interfaces.go +++ b/src/clientlibrary/metrics/interfaces.go @@ -7,6 +7,7 @@ import ( // MonitoringConfiguration allows you to configure how record processing metrics are exposed type MonitoringConfiguration struct { MonitoringService string // Type of monitoring to expose. Supported types are "prometheus" + Region string Prometheus PrometheusMonitoringService CloudWatch CloudWatchMonitoringService service MonitoringService @@ -22,6 +23,7 @@ type MonitoringService interface { LeaseRenewed(string) RecordGetRecordsTime(string, float64) RecordProcessRecordsTime(string, float64) + Flush() error } func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error { @@ -35,10 +37,13 @@ func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID st m.Prometheus.Namespace = nameSpace m.Prometheus.KinesisStream = streamName m.Prometheus.WorkerID = workerID + m.Prometheus.Region = m.Region m.service = &m.Prometheus case "cloudwatch": + m.CloudWatch.Namespace = nameSpace m.CloudWatch.KinesisStream = streamName m.CloudWatch.WorkerID = workerID + m.CloudWatch.Region = m.Region m.service = &m.CloudWatch default: return fmt.Errorf("Invalid monitoring service type %s", m.MonitoringService) @@ -64,3 +69,4 @@ func (n *noopMonitoringService) LeaseLost(shard string) func (n *noopMonitoringService) LeaseRenewed(shard string) {} func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {} func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {} +func (n *noopMonitoringService) Flush() error { return nil } diff --git a/src/clientlibrary/metrics/prometheus.go b/src/clientlibrary/metrics/prometheus.go index 4ec13fd..fc9ab28 100644 --- a/src/clientlibrary/metrics/prometheus.go +++ b/src/clientlibrary/metrics/prometheus.go @@ -14,6 +14,7 @@ type PrometheusMonitoringService struct { Namespace string KinesisStream string WorkerID string + Region string processedRecords *prometheus.CounterVec processedBytes *prometheus.CounterVec behindLatestMillis *prometheus.GaugeVec @@ -111,3 +112,5 @@ func (p *PrometheusMonitoringService) RecordGetRecordsTime(shard string, time fl func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) { p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time) } + +func (p *PrometheusMonitoringService) Flush() error { return nil } diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go index 9b9f175..07a3dc4 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/clientlibrary/worker/shard-consumer.go @@ -209,6 +209,8 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { case <-*sc.stop: shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) + // flush out the metrics data + sc.mService.Flush() return nil case <-time.After(1 * time.Nanosecond): } diff --git a/src/clientlibrary/worker/worker_test.go b/src/clientlibrary/worker/worker_test.go index 88b3dac..eb33bfe 100644 --- a/src/clientlibrary/worker/worker_test.go +++ b/src/clientlibrary/worker/worker_test.go @@ -1,15 +1,18 @@ package worker import ( + "net/http" "os" "testing" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/prometheus/common/expfmt" log "github.com/sirupsen/logrus" cfg "clientlibrary/config" kc "clientlibrary/interfaces" + "clientlibrary/metrics" "clientlibrary/utils" "github.com/stretchr/testify/assert" ) @@ -21,6 +24,7 @@ const ( ) const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` +const metricsSystem = "cloudwatch" func TestWorker(t *testing.T) { os.Setenv("AWS_ACCESS_KEY_ID", "your aws access key id") @@ -40,7 +44,10 @@ func TestWorker(t *testing.T) { assert.Equal(t, regionName, kclConfig.RegionName) assert.Equal(t, streamName, kclConfig.StreamName) - worker := NewWorker(recordProcessorFactory(t), kclConfig, nil) + // configure cloudwatch as metrics system + metricsConfig := getMetricsConfig(metricsSystem) + + worker := NewWorker(recordProcessorFactory(t), kclConfig, metricsConfig) assert.Equal(t, regionName, worker.regionName) assert.Equal(t, streamName, worker.streamName) @@ -56,10 +63,53 @@ func TestWorker(t *testing.T) { } } + // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) + + if metricsConfig != nil && metricsConfig.MonitoringService == "prometheus" { + res, err := http.Get("http://localhost:8080/metrics") + if err != nil { + t.Fatalf("Error scraping Prometheus endpoint %s", err) + } + + var parser expfmt.TextParser + parsed, err := parser.TextToMetricFamilies(res.Body) + res.Body.Close() + if err != nil { + t.Errorf("Error reading monitoring response %s", err) + } + t.Logf("Prometheus: %+v", parsed) + + } + worker.Shutdown() } +// configure different metrics system +func getMetricsConfig(service string) *metrics.MonitoringConfiguration { + if service == "cloudwatch" { + return &metrics.MonitoringConfiguration{ + MonitoringService: "cloudwatch", + Region: regionName, + CloudWatch: metrics.CloudWatchMonitoringService{ + ResolutionSec: 1, + }, + } + } + + if service == "prometheus" { + return &metrics.MonitoringConfiguration{ + MonitoringService: "prometheus", + Region: regionName, + Prometheus: metrics.PrometheusMonitoringService{ + ListenAddress: ":8080", + }, + } + } + + return nil +} + // Record processor factory is used to create RecordProcessor func recordProcessorFactory(t *testing.T) kc.IRecordProcessorFactory { return &dumpRecordProcessorFactory{t: t}