package metrics import ( "sync" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" log "github.com/sirupsen/logrus" ) type CloudWatchMonitoringService struct { Namespace string KinesisStream string WorkerID string Region string // how frequently to send data to cloudwatch ResolutionSec int svc cloudwatchiface.CloudWatchAPI shardMetrics map[string]*cloudWatchMetrics } type cloudWatchMetrics struct { processedRecords int64 processedBytes int64 behindLatestMillis []float64 leasesHeld int64 leaseRenewals int64 getRecordsTime []float64 processRecordsTime []float64 sync.Mutex } func (cw *CloudWatchMonitoringService) Init() error { // default to 1 min resolution if cw.ResolutionSec == 0 { cw.ResolutionSec = 60 } s := session.New(&aws.Config{Region: aws.String(cw.Region)}) cw.svc = cloudwatch.New(s) cw.shardMetrics = make(map[string]*cloudWatchMetrics) return nil } // Start daemon to flush metrics periodically func (cw *CloudWatchMonitoringService) flushDaemon() { previousFlushTime := time.Now() resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second for { time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime)) err := cw.Flush() if err != nil { log.Errorf("Error sending metrics to CloudWatch. %+v", err) } previousFlushTime = time.Now() } } func (cw *CloudWatchMonitoringService) Flush() error { // publish per shard metrics for shard, metric := range cw.shardMetrics { metric.Lock() defaultDimensions := []*cloudwatch.Dimension{ { Name: aws.String("Shard"), Value: &shard, }, { Name: aws.String("KinesisStreamName"), Value: &cw.KinesisStream, }, } leaseDimensions := []*cloudwatch.Dimension{ { Name: aws.String("Shard"), Value: &shard, }, { Name: aws.String("KinesisStreamName"), Value: &cw.KinesisStream, }, { Name: aws.String("WorkerID"), Value: &cw.WorkerID, }, } metricTimestamp := time.Now() // Publish metrics data to cloud watch _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ Namespace: aws.String(cw.Namespace), MetricData: []*cloudwatch.MetricDatum{ { Dimensions: defaultDimensions, MetricName: aws.String("RecordsProcessed"), Unit: aws.String("Count"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.processedRecords)), }, { Dimensions: defaultDimensions, MetricName: aws.String("DataBytesProcessed"), Unit: aws.String("Bytes"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.processedBytes)), }, { Dimensions: defaultDimensions, MetricName: aws.String("MillisBehindLatest"), Unit: aws.String("Milliseconds"), Timestamp: &metricTimestamp, StatisticValues: &cloudwatch.StatisticSet{ SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))), Sum: sumFloat64(metric.behindLatestMillis), Maximum: maxFloat64(metric.behindLatestMillis), Minimum: minFloat64(metric.behindLatestMillis), }, }, { Dimensions: defaultDimensions, MetricName: aws.String("KinesisDataFetcher.getRecords.Time"), Unit: aws.String("Milliseconds"), Timestamp: &metricTimestamp, StatisticValues: &cloudwatch.StatisticSet{ SampleCount: aws.Float64(float64(len(metric.getRecordsTime))), Sum: sumFloat64(metric.getRecordsTime), Maximum: maxFloat64(metric.getRecordsTime), Minimum: minFloat64(metric.getRecordsTime), }, }, { Dimensions: defaultDimensions, MetricName: aws.String("RecordProcessor.processRecords.Time"), Unit: aws.String("Milliseconds"), Timestamp: &metricTimestamp, StatisticValues: &cloudwatch.StatisticSet{ SampleCount: aws.Float64(float64(len(metric.processRecordsTime))), Sum: sumFloat64(metric.processRecordsTime), Maximum: maxFloat64(metric.processRecordsTime), Minimum: minFloat64(metric.processRecordsTime), }, }, { Dimensions: leaseDimensions, MetricName: aws.String("RenewLease.Success"), Unit: aws.String("Count"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.leaseRenewals)), }, { Dimensions: leaseDimensions, MetricName: aws.String("CurrentLeases"), Unit: aws.String("Count"), Timestamp: &metricTimestamp, Value: aws.Float64(float64(metric.leasesHeld)), }, }, }) if err == nil { metric.processedRecords = 0 metric.processedBytes = 0 metric.behindLatestMillis = []float64{} metric.leaseRenewals = 0 metric.getRecordsTime = []float64{} metric.processRecordsTime = []float64{} } else { log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err) } metric.Unlock() return err } return nil } func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].processedRecords += int64(count) } func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].processedBytes += count } func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].behindLatestMillis = append(cw.shardMetrics[shard].behindLatestMillis, millSeconds) } func (cw *CloudWatchMonitoringService) LeaseGained(shard string) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].leasesHeld++ } func (cw *CloudWatchMonitoringService) LeaseLost(shard string) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].leasesHeld-- } func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].leaseRenewals++ } func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].getRecordsTime = append(cw.shardMetrics[shard].getRecordsTime, time) } func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) { if _, ok := cw.shardMetrics[shard]; !ok { cw.shardMetrics[shard] = &cloudWatchMetrics{} } cw.shardMetrics[shard].Lock() defer cw.shardMetrics[shard].Unlock() cw.shardMetrics[shard].processRecordsTime = append(cw.shardMetrics[shard].processRecordsTime, time) } func sumFloat64(slice []float64) *float64 { sum := float64(0) for _, num := range slice { sum += num } return &sum } func maxFloat64(slice []float64) *float64 { if len(slice) < 1 { return aws.Float64(0) } max := slice[0] for _, num := range slice { if num > max { max = num } } return &max } func minFloat64(slice []float64) *float64 { if len(slice) < 1 { return aws.Float64(0) } min := slice[0] for _, num := range slice { if num < min { min = num } } return &min }