From e1071abc80cff2a851977e11b21b55f900385679 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Fri, 20 Apr 2018 21:07:11 -0700 Subject: [PATCH] KCL: Fix cloudwatch metrics This changes fixed cloudwatch metrics publishing by adding long running go routine to periodically publish cloudwatch metrics. Also, shutdown metrics publishing when KCL is shutdown. Test: Run hmake test and verified cloudwatch metrics has been published via AWS cloudwatch console. Jira CNA-702 Change-Id: I78b347cd12939447b0daf93f51acf620d18e2f49 --- src/clientlibrary/metrics/cloudwatch.go | 391 +++++++++++---------- src/clientlibrary/metrics/interfaces.go | 10 +- src/clientlibrary/metrics/prometheus.go | 29 +- src/clientlibrary/worker/shard-consumer.go | 2 - src/clientlibrary/worker/worker.go | 8 +- src/clientlibrary/worker/worker_test.go | 4 +- 6 files changed, 248 insertions(+), 196 deletions(-) diff --git a/src/clientlibrary/metrics/cloudwatch.go b/src/clientlibrary/metrics/cloudwatch.go index 385438b..6724e07 100644 --- a/src/clientlibrary/metrics/cloudwatch.go +++ b/src/clientlibrary/metrics/cloudwatch.go @@ -16,10 +16,15 @@ type CloudWatchMonitoringService struct { KinesisStream string WorkerID string Region string - // how frequently to send data to cloudwatch - ResolutionSec int - svc cloudwatchiface.CloudWatchAPI - shardMetrics map[string]*cloudWatchMetrics + + // control how often to pusblish to CloudWatch + MetricsBufferTimeMillis int + MetricsMaxQueueSize int + + stop *chan struct{} + waitGroup *sync.WaitGroup + svc cloudwatchiface.CloudWatchAPI + shardMetrics *sync.Map } type cloudWatchMetrics struct { @@ -34,219 +39,251 @@ type cloudWatchMetrics struct { } func (cw *CloudWatchMonitoringService) Init() error { - // default to 1 min resolution - if cw.ResolutionSec == 0 { - cw.ResolutionSec = 60 - } - s := session.New(&aws.Config{Region: aws.String(cw.Region)}) cw.svc = cloudwatch.New(s) - cw.shardMetrics = make(map[string]*cloudWatchMetrics) + cw.shardMetrics = new(sync.Map) + + stopChan := make(chan struct{}) + cw.stop = &stopChan + wg := sync.WaitGroup{} + cw.waitGroup = &wg return nil } +func (cw *CloudWatchMonitoringService) Start() error { + cw.waitGroup.Add(1) + // entering eventloop for sending metrics to CloudWatch + go cw.eventloop() + return nil +} + +func (cw *CloudWatchMonitoringService) Shutdown() { + log.Info("Shutting down cloudwatch metrics system...") + close(*cw.stop) + cw.waitGroup.Wait() + log.Info("Cloudwatch metrics system has been shutdown.") +} + // Start daemon to flush metrics periodically -func (cw *CloudWatchMonitoringService) flushDaemon() { - previousFlushTime := time.Now() - resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second +func (cw *CloudWatchMonitoringService) eventloop() { + defer cw.waitGroup.Done() + for { - time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime)) - err := cw.Flush() + err := cw.flush() if err != nil { log.Errorf("Error sending metrics to CloudWatch. %+v", err) } - previousFlushTime = time.Now() + + select { + case <-*cw.stop: + log.Info("Shutting down monitoring system") + cw.flush() + return + case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond): + } } } -func (cw *CloudWatchMonitoringService) Flush() error { - // publish per shard metrics - for shard, metric := range cw.shardMetrics { - metric.Lock() - defaultDimensions := []*cloudwatch.Dimension{ - { - Name: aws.String("Shard"), - Value: &shard, - }, - { - Name: aws.String("KinesisStreamName"), - Value: &cw.KinesisStream, - }, - } - - leaseDimensions := []*cloudwatch.Dimension{ - { - Name: aws.String("Shard"), - Value: &shard, - }, - { - Name: aws.String("KinesisStreamName"), - Value: &cw.KinesisStream, - }, - { - Name: aws.String("WorkerID"), - Value: &cw.WorkerID, - }, - } - metricTimestamp := time.Now() - - // Publish metrics data to cloud watch - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.Namespace), - MetricData: []*cloudwatch.MetricDatum{ - { - Dimensions: defaultDimensions, - MetricName: aws.String("RecordsProcessed"), - Unit: aws.String("Count"), - Timestamp: &metricTimestamp, - Value: aws.Float64(float64(metric.processedRecords)), - }, - { - Dimensions: defaultDimensions, - MetricName: aws.String("DataBytesProcessed"), - Unit: aws.String("Bytes"), - Timestamp: &metricTimestamp, - Value: aws.Float64(float64(metric.processedBytes)), - }, - { - Dimensions: defaultDimensions, - MetricName: aws.String("MillisBehindLatest"), - Unit: aws.String("Milliseconds"), - Timestamp: &metricTimestamp, - StatisticValues: &cloudwatch.StatisticSet{ - SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))), - Sum: sumFloat64(metric.behindLatestMillis), - Maximum: maxFloat64(metric.behindLatestMillis), - Minimum: minFloat64(metric.behindLatestMillis), - }, - }, - { - Dimensions: defaultDimensions, - MetricName: aws.String("KinesisDataFetcher.getRecords.Time"), - Unit: aws.String("Milliseconds"), - Timestamp: &metricTimestamp, - StatisticValues: &cloudwatch.StatisticSet{ - SampleCount: aws.Float64(float64(len(metric.getRecordsTime))), - Sum: sumFloat64(metric.getRecordsTime), - Maximum: maxFloat64(metric.getRecordsTime), - Minimum: minFloat64(metric.getRecordsTime), - }, - }, - { - Dimensions: defaultDimensions, - MetricName: aws.String("RecordProcessor.processRecords.Time"), - Unit: aws.String("Milliseconds"), - Timestamp: &metricTimestamp, - StatisticValues: &cloudwatch.StatisticSet{ - SampleCount: aws.Float64(float64(len(metric.processRecordsTime))), - Sum: sumFloat64(metric.processRecordsTime), - Maximum: maxFloat64(metric.processRecordsTime), - Minimum: minFloat64(metric.processRecordsTime), - }, - }, - { - Dimensions: leaseDimensions, - MetricName: aws.String("RenewLease.Success"), - Unit: aws.String("Count"), - Timestamp: &metricTimestamp, - Value: aws.Float64(float64(metric.leaseRenewals)), - }, - { - Dimensions: leaseDimensions, - MetricName: aws.String("CurrentLeases"), - Unit: aws.String("Count"), - Timestamp: &metricTimestamp, - Value: aws.Float64(float64(metric.leasesHeld)), - }, - }, - }) - if err == nil { - metric.processedRecords = 0 - metric.processedBytes = 0 - metric.behindLatestMillis = []float64{} - metric.leaseRenewals = 0 - metric.getRecordsTime = []float64{} - metric.processRecordsTime = []float64{} - } else { - log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) - } - - metric.Unlock() - return err +func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWatchMetrics) bool { + metric.Lock() + defaultDimensions := []*cloudwatch.Dimension{ + { + Name: aws.String("Shard"), + Value: &shard, + }, + { + Name: aws.String("KinesisStreamName"), + Value: &cw.KinesisStream, + }, } + + leaseDimensions := []*cloudwatch.Dimension{ + { + Name: aws.String("Shard"), + Value: &shard, + }, + { + Name: aws.String("KinesisStreamName"), + Value: &cw.KinesisStream, + }, + { + Name: aws.String("WorkerID"), + Value: &cw.WorkerID, + }, + } + metricTimestamp := time.Now() + + data := []*cloudwatch.MetricDatum{ + { + Dimensions: defaultDimensions, + MetricName: aws.String("RecordsProcessed"), + Unit: aws.String("Count"), + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.processedRecords)), + }, + { + Dimensions: defaultDimensions, + MetricName: aws.String("DataBytesProcessed"), + Unit: aws.String("Bytes"), + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.processedBytes)), + }, + { + Dimensions: leaseDimensions, + MetricName: aws.String("RenewLease.Success"), + Unit: aws.String("Count"), + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.leaseRenewals)), + }, + { + Dimensions: leaseDimensions, + MetricName: aws.String("CurrentLeases"), + Unit: aws.String("Count"), + Timestamp: &metricTimestamp, + Value: aws.Float64(float64(metric.leasesHeld)), + }, + } + + if len(metric.behindLatestMillis) > 0 { + data = append(data, &cloudwatch.MetricDatum{ + Dimensions: defaultDimensions, + MetricName: aws.String("MillisBehindLatest"), + Unit: aws.String("Milliseconds"), + Timestamp: &metricTimestamp, + StatisticValues: &cloudwatch.StatisticSet{ + SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))), + Sum: sumFloat64(metric.behindLatestMillis), + Maximum: maxFloat64(metric.behindLatestMillis), + Minimum: minFloat64(metric.behindLatestMillis), + }}) + } + + if len(metric.getRecordsTime) > 0 { + data = append(data, &cloudwatch.MetricDatum{ + Dimensions: defaultDimensions, + MetricName: aws.String("KinesisDataFetcher.getRecords.Time"), + Unit: aws.String("Milliseconds"), + Timestamp: &metricTimestamp, + StatisticValues: &cloudwatch.StatisticSet{ + SampleCount: aws.Float64(float64(len(metric.getRecordsTime))), + Sum: sumFloat64(metric.getRecordsTime), + Maximum: maxFloat64(metric.getRecordsTime), + Minimum: minFloat64(metric.getRecordsTime), + }}) + } + + if len(metric.processRecordsTime) > 0 { + data = append(data, &cloudwatch.MetricDatum{ + Dimensions: defaultDimensions, + MetricName: aws.String("RecordProcessor.processRecords.Time"), + Unit: aws.String("Milliseconds"), + Timestamp: &metricTimestamp, + StatisticValues: &cloudwatch.StatisticSet{ + SampleCount: aws.Float64(float64(len(metric.processRecordsTime))), + Sum: sumFloat64(metric.processRecordsTime), + Maximum: maxFloat64(metric.processRecordsTime), + Minimum: minFloat64(metric.processRecordsTime), + }}) + } + + // Publish metrics data to cloud watch + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.Namespace), + MetricData: data, + }) + + if err == nil { + metric.processedRecords = 0 + metric.processedBytes = 0 + metric.behindLatestMillis = []float64{} + metric.leaseRenewals = 0 + metric.getRecordsTime = []float64{} + metric.processRecordsTime = []float64{} + } else { + log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) + } + + metric.Unlock() + return true +} + +func (cw *CloudWatchMonitoringService) flush() error { + log.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID) + // publish per shard metrics + cw.shardMetrics.Range(func(k, v interface{}) bool { + shard, metric := k.(string), v.(*cloudWatchMetrics) + return cw.flushShard(shard, metric) + }) + return nil } func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].processedRecords += int64(count) + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.processedRecords += int64(count) } func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].processedBytes += count + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.processedBytes += count } func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].behindLatestMillis = append(cw.shardMetrics[shard].behindLatestMillis, millSeconds) + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.behindLatestMillis = append(m.behindLatestMillis, millSeconds) } func (cw *CloudWatchMonitoringService) LeaseGained(shard string) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].leasesHeld++ + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.leasesHeld++ } func (cw *CloudWatchMonitoringService) LeaseLost(shard string) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].leasesHeld-- + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.leasesHeld-- } func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].leaseRenewals++ + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.leaseRenewals++ } func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} - } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].getRecordsTime = append(cw.shardMetrics[shard].getRecordsTime, time) + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.getRecordsTime = append(m.getRecordsTime, time) } func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) { - if _, ok := cw.shardMetrics[shard]; !ok { - cw.shardMetrics[shard] = &cloudWatchMetrics{} + m := cw.getOrCreatePerShardMetrics(shard) + m.Lock() + defer m.Unlock() + m.processRecordsTime = append(m.processRecordsTime, time) +} + +func (cw *CloudWatchMonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { + var i interface{} + var ok bool + if i, ok = cw.shardMetrics.Load(shard); !ok { + m := &cloudWatchMetrics{} + cw.shardMetrics.Store(shard, m) + return m } - cw.shardMetrics[shard].Lock() - defer cw.shardMetrics[shard].Unlock() - cw.shardMetrics[shard].processRecordsTime = append(cw.shardMetrics[shard].processRecordsTime, time) + + return i.(*cloudWatchMetrics) } func sumFloat64(slice []float64) *float64 { diff --git a/src/clientlibrary/metrics/interfaces.go b/src/clientlibrary/metrics/interfaces.go index 57dfc11..809089e 100644 --- a/src/clientlibrary/metrics/interfaces.go +++ b/src/clientlibrary/metrics/interfaces.go @@ -15,6 +15,7 @@ type MonitoringConfiguration struct { type MonitoringService interface { Init() error + Start() error IncrRecordsProcessed(string, int) IncrBytesProcessed(string, int64) MillisBehindLatest(string, float64) @@ -23,7 +24,7 @@ type MonitoringService interface { LeaseRenewed(string) RecordGetRecordsTime(string, float64) RecordProcessRecordsTime(string, float64) - Flush() error + Shutdown() } func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error { @@ -57,9 +58,9 @@ func (m *MonitoringConfiguration) GetMonitoringService() MonitoringService { type noopMonitoringService struct{} -func (n *noopMonitoringService) Init() error { - return nil -} +func (n *noopMonitoringService) Init() error { return nil } +func (n *noopMonitoringService) Start() error { return nil } +func (n *noopMonitoringService) Shutdown() {} func (n *noopMonitoringService) IncrRecordsProcessed(shard string, count int) {} func (n *noopMonitoringService) IncrBytesProcessed(shard string, count int64) {} @@ -69,4 +70,3 @@ func (n *noopMonitoringService) LeaseLost(shard string) func (n *noopMonitoringService) LeaseRenewed(shard string) {} func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {} func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {} -func (n *noopMonitoringService) Flush() error { return nil } diff --git a/src/clientlibrary/metrics/prometheus.go b/src/clientlibrary/metrics/prometheus.go index fc9ab28..bdf3ab0 100644 --- a/src/clientlibrary/metrics/prometheus.go +++ b/src/clientlibrary/metrics/prometheus.go @@ -8,6 +8,9 @@ import ( log "github.com/sirupsen/logrus" ) +// PrometheusMonitoringService to start Prometheus as metrics system. +// It might be trick if the service onboarding with KCL also uses Prometheus. +// Therefore, we should start cloudwatch metrics by default instead. type PrometheusMonitoringService struct { ListenAddress string @@ -70,17 +73,25 @@ func (p *PrometheusMonitoringService) Init() error { } } - http.Handle("/metrics", promhttp.Handler()) - go func() { - log.Debugf("Starting Prometheus listener on %s", p.ListenAddress) - err := http.ListenAndServe(p.ListenAddress, nil) - if err != nil { - log.Errorln("Error starting Prometheus metrics endpoint", err) - } - }() return nil } +func (p *PrometheusMonitoringService) Start() error { + http.Handle("/metrics", promhttp.Handler()) + go func() { + log.Infof("Starting Prometheus listener on %s", p.ListenAddress) + err := http.ListenAndServe(p.ListenAddress, nil) + if err != nil { + log.Errorf("Error starting Prometheus metrics endpoint. %+v", err) + } + log.Info("Stopped metrics server") + }() + + return nil +} + +func (p *PrometheusMonitoringService) Shutdown() {} + func (p *PrometheusMonitoringService) IncrRecordsProcessed(shard string, count int) { p.processedRecords.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count)) } @@ -112,5 +123,3 @@ func (p *PrometheusMonitoringService) RecordGetRecordsTime(shard string, time fl func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) { p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time) } - -func (p *PrometheusMonitoringService) Flush() error { return nil } diff --git a/src/clientlibrary/worker/shard-consumer.go b/src/clientlibrary/worker/shard-consumer.go index 07a3dc4..9b9f175 100644 --- a/src/clientlibrary/worker/shard-consumer.go +++ b/src/clientlibrary/worker/shard-consumer.go @@ -209,8 +209,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { case <-*sc.stop: shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) - // flush out the metrics data - sc.mService.Flush() return nil case <-time.After(1 * time.Nanosecond): } diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go index d8402ec..9a7e27c 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/clientlibrary/worker/worker.go @@ -106,8 +106,11 @@ func (w *Worker) Start() error { return err } - log.Info("Initialization complete. Starting worker event loop.") + // Start monitoring service + log.Info("Starting monitoring service.") + w.mService.Start() + log.Info("Starting worker event loop.") // entering event loop go w.eventLoop() return nil @@ -120,6 +123,7 @@ func (w *Worker) Shutdown() { close(*w.stop) w.waitGroup.Wait() + w.mService.Shutdown() log.Info("Worker loop is complete. Exiting from worker.") } @@ -170,6 +174,8 @@ func (w *Worker) initialize() error { return err } + log.Info("Initialization complete.") + return nil } diff --git a/src/clientlibrary/worker/worker_test.go b/src/clientlibrary/worker/worker_test.go index eb33bfe..e1dee71 100644 --- a/src/clientlibrary/worker/worker_test.go +++ b/src/clientlibrary/worker/worker_test.go @@ -92,7 +92,9 @@ func getMetricsConfig(service string) *metrics.MonitoringConfiguration { MonitoringService: "cloudwatch", Region: regionName, CloudWatch: metrics.CloudWatchMonitoringService{ - ResolutionSec: 1, + // Those value should come from kclConfig + MetricsBufferTimeMillis: 10000, + MetricsMaxQueueSize: 20, }, } }