KCL: Fix cloudwatch metrics

This changes fixed cloudwatch metrics publishing by adding long
running go routine to periodically publish cloudwatch metrics.
Also, shutdown metrics publishing when KCL is shutdown.

Test:
Run hmake test and verified cloudwatch metrics has been
published via AWS cloudwatch console.

Jira CNA-702

Change-Id: I78b347cd12939447b0daf93f51acf620d18e2f49
This commit is contained in:
Tao Jiang 2018-04-20 21:07:11 -07:00
parent 2fea884212
commit e1071abc80
6 changed files with 248 additions and 196 deletions

View file

@ -16,10 +16,15 @@ type CloudWatchMonitoringService struct {
KinesisStream string KinesisStream string
WorkerID string WorkerID string
Region string Region string
// how frequently to send data to cloudwatch
ResolutionSec int // control how often to pusblish to CloudWatch
svc cloudwatchiface.CloudWatchAPI MetricsBufferTimeMillis int
shardMetrics map[string]*cloudWatchMetrics MetricsMaxQueueSize int
stop *chan struct{}
waitGroup *sync.WaitGroup
svc cloudwatchiface.CloudWatchAPI
shardMetrics *sync.Map
} }
type cloudWatchMetrics struct { type cloudWatchMetrics struct {
@ -34,219 +39,251 @@ type cloudWatchMetrics struct {
} }
func (cw *CloudWatchMonitoringService) Init() error { func (cw *CloudWatchMonitoringService) Init() error {
// default to 1 min resolution
if cw.ResolutionSec == 0 {
cw.ResolutionSec = 60
}
s := session.New(&aws.Config{Region: aws.String(cw.Region)}) s := session.New(&aws.Config{Region: aws.String(cw.Region)})
cw.svc = cloudwatch.New(s) cw.svc = cloudwatch.New(s)
cw.shardMetrics = make(map[string]*cloudWatchMetrics) cw.shardMetrics = new(sync.Map)
stopChan := make(chan struct{})
cw.stop = &stopChan
wg := sync.WaitGroup{}
cw.waitGroup = &wg
return nil return nil
} }
func (cw *CloudWatchMonitoringService) Start() error {
cw.waitGroup.Add(1)
// entering eventloop for sending metrics to CloudWatch
go cw.eventloop()
return nil
}
func (cw *CloudWatchMonitoringService) Shutdown() {
log.Info("Shutting down cloudwatch metrics system...")
close(*cw.stop)
cw.waitGroup.Wait()
log.Info("Cloudwatch metrics system has been shutdown.")
}
// Start daemon to flush metrics periodically // Start daemon to flush metrics periodically
func (cw *CloudWatchMonitoringService) flushDaemon() { func (cw *CloudWatchMonitoringService) eventloop() {
previousFlushTime := time.Now() defer cw.waitGroup.Done()
resolutionDuration := time.Duration(cw.ResolutionSec) * time.Second
for { for {
time.Sleep(resolutionDuration - time.Now().Sub(previousFlushTime)) err := cw.flush()
err := cw.Flush()
if err != nil { if err != nil {
log.Errorf("Error sending metrics to CloudWatch. %+v", err) log.Errorf("Error sending metrics to CloudWatch. %+v", err)
} }
previousFlushTime = time.Now()
select {
case <-*cw.stop:
log.Info("Shutting down monitoring system")
cw.flush()
return
case <-time.After(time.Duration(cw.MetricsBufferTimeMillis) * time.Millisecond):
}
} }
} }
func (cw *CloudWatchMonitoringService) Flush() error { func (cw *CloudWatchMonitoringService) flushShard(shard string, metric *cloudWatchMetrics) bool {
// publish per shard metrics metric.Lock()
for shard, metric := range cw.shardMetrics { defaultDimensions := []*cloudwatch.Dimension{
metric.Lock() {
defaultDimensions := []*cloudwatch.Dimension{ Name: aws.String("Shard"),
{ Value: &shard,
Name: aws.String("Shard"), },
Value: &shard, {
}, Name: aws.String("KinesisStreamName"),
{ Value: &cw.KinesisStream,
Name: aws.String("KinesisStreamName"), },
Value: &cw.KinesisStream,
},
}
leaseDimensions := []*cloudwatch.Dimension{
{
Name: aws.String("Shard"),
Value: &shard,
},
{
Name: aws.String("KinesisStreamName"),
Value: &cw.KinesisStream,
},
{
Name: aws.String("WorkerID"),
Value: &cw.WorkerID,
},
}
metricTimestamp := time.Now()
// Publish metrics data to cloud watch
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.Namespace),
MetricData: []*cloudwatch.MetricDatum{
{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordsProcessed"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedRecords)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("DataBytesProcessed"),
Unit: aws.String("Bytes"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedBytes)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("MillisBehindLatest"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))),
Sum: sumFloat64(metric.behindLatestMillis),
Maximum: maxFloat64(metric.behindLatestMillis),
Minimum: minFloat64(metric.behindLatestMillis),
},
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("KinesisDataFetcher.getRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.getRecordsTime))),
Sum: sumFloat64(metric.getRecordsTime),
Maximum: maxFloat64(metric.getRecordsTime),
Minimum: minFloat64(metric.getRecordsTime),
},
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordProcessor.processRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.processRecordsTime))),
Sum: sumFloat64(metric.processRecordsTime),
Maximum: maxFloat64(metric.processRecordsTime),
Minimum: minFloat64(metric.processRecordsTime),
},
},
{
Dimensions: leaseDimensions,
MetricName: aws.String("RenewLease.Success"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leaseRenewals)),
},
{
Dimensions: leaseDimensions,
MetricName: aws.String("CurrentLeases"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leasesHeld)),
},
},
})
if err == nil {
metric.processedRecords = 0
metric.processedBytes = 0
metric.behindLatestMillis = []float64{}
metric.leaseRenewals = 0
metric.getRecordsTime = []float64{}
metric.processRecordsTime = []float64{}
} else {
log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err)
}
metric.Unlock()
return err
} }
leaseDimensions := []*cloudwatch.Dimension{
{
Name: aws.String("Shard"),
Value: &shard,
},
{
Name: aws.String("KinesisStreamName"),
Value: &cw.KinesisStream,
},
{
Name: aws.String("WorkerID"),
Value: &cw.WorkerID,
},
}
metricTimestamp := time.Now()
data := []*cloudwatch.MetricDatum{
{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordsProcessed"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedRecords)),
},
{
Dimensions: defaultDimensions,
MetricName: aws.String("DataBytesProcessed"),
Unit: aws.String("Bytes"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.processedBytes)),
},
{
Dimensions: leaseDimensions,
MetricName: aws.String("RenewLease.Success"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leaseRenewals)),
},
{
Dimensions: leaseDimensions,
MetricName: aws.String("CurrentLeases"),
Unit: aws.String("Count"),
Timestamp: &metricTimestamp,
Value: aws.Float64(float64(metric.leasesHeld)),
},
}
if len(metric.behindLatestMillis) > 0 {
data = append(data, &cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("MillisBehindLatest"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.behindLatestMillis))),
Sum: sumFloat64(metric.behindLatestMillis),
Maximum: maxFloat64(metric.behindLatestMillis),
Minimum: minFloat64(metric.behindLatestMillis),
}})
}
if len(metric.getRecordsTime) > 0 {
data = append(data, &cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("KinesisDataFetcher.getRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.getRecordsTime))),
Sum: sumFloat64(metric.getRecordsTime),
Maximum: maxFloat64(metric.getRecordsTime),
Minimum: minFloat64(metric.getRecordsTime),
}})
}
if len(metric.processRecordsTime) > 0 {
data = append(data, &cloudwatch.MetricDatum{
Dimensions: defaultDimensions,
MetricName: aws.String("RecordProcessor.processRecords.Time"),
Unit: aws.String("Milliseconds"),
Timestamp: &metricTimestamp,
StatisticValues: &cloudwatch.StatisticSet{
SampleCount: aws.Float64(float64(len(metric.processRecordsTime))),
Sum: sumFloat64(metric.processRecordsTime),
Maximum: maxFloat64(metric.processRecordsTime),
Minimum: minFloat64(metric.processRecordsTime),
}})
}
// Publish metrics data to cloud watch
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.Namespace),
MetricData: data,
})
if err == nil {
metric.processedRecords = 0
metric.processedBytes = 0
metric.behindLatestMillis = []float64{}
metric.leaseRenewals = 0
metric.getRecordsTime = []float64{}
metric.processRecordsTime = []float64{}
} else {
log.Errorf("Error in publishing cloudwatch metrics. Error: %+v", err)
}
metric.Unlock()
return true
}
func (cw *CloudWatchMonitoringService) flush() error {
log.Debugf("Flushing metrics data. Stream: %s, Worker: %s", cw.KinesisStream, cw.WorkerID)
// publish per shard metrics
cw.shardMetrics.Range(func(k, v interface{}) bool {
shard, metric := k.(string), v.(*cloudWatchMetrics)
return cw.flushShard(shard, metric)
})
return nil return nil
} }
func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) { func (cw *CloudWatchMonitoringService) IncrRecordsProcessed(shard string, count int) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.processedRecords += int64(count)
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].processedRecords += int64(count)
} }
func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) { func (cw *CloudWatchMonitoringService) IncrBytesProcessed(shard string, count int64) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.processedBytes += count
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].processedBytes += count
} }
func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) { func (cw *CloudWatchMonitoringService) MillisBehindLatest(shard string, millSeconds float64) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.behindLatestMillis = append(m.behindLatestMillis, millSeconds)
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].behindLatestMillis = append(cw.shardMetrics[shard].behindLatestMillis, millSeconds)
} }
func (cw *CloudWatchMonitoringService) LeaseGained(shard string) { func (cw *CloudWatchMonitoringService) LeaseGained(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.leasesHeld++
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leasesHeld++
} }
func (cw *CloudWatchMonitoringService) LeaseLost(shard string) { func (cw *CloudWatchMonitoringService) LeaseLost(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.leasesHeld--
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leasesHeld--
} }
func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) { func (cw *CloudWatchMonitoringService) LeaseRenewed(shard string) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.leaseRenewals++
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].leaseRenewals++
} }
func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) { func (cw *CloudWatchMonitoringService) RecordGetRecordsTime(shard string, time float64) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
} defer m.Unlock()
cw.shardMetrics[shard].Lock() m.getRecordsTime = append(m.getRecordsTime, time)
defer cw.shardMetrics[shard].Unlock()
cw.shardMetrics[shard].getRecordsTime = append(cw.shardMetrics[shard].getRecordsTime, time)
} }
func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) { func (cw *CloudWatchMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
if _, ok := cw.shardMetrics[shard]; !ok { m := cw.getOrCreatePerShardMetrics(shard)
cw.shardMetrics[shard] = &cloudWatchMetrics{} m.Lock()
defer m.Unlock()
m.processRecordsTime = append(m.processRecordsTime, time)
}
func (cw *CloudWatchMonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{}
var ok bool
if i, ok = cw.shardMetrics.Load(shard); !ok {
m := &cloudWatchMetrics{}
cw.shardMetrics.Store(shard, m)
return m
} }
cw.shardMetrics[shard].Lock()
defer cw.shardMetrics[shard].Unlock() return i.(*cloudWatchMetrics)
cw.shardMetrics[shard].processRecordsTime = append(cw.shardMetrics[shard].processRecordsTime, time)
} }
func sumFloat64(slice []float64) *float64 { func sumFloat64(slice []float64) *float64 {

View file

@ -15,6 +15,7 @@ type MonitoringConfiguration struct {
type MonitoringService interface { type MonitoringService interface {
Init() error Init() error
Start() error
IncrRecordsProcessed(string, int) IncrRecordsProcessed(string, int)
IncrBytesProcessed(string, int64) IncrBytesProcessed(string, int64)
MillisBehindLatest(string, float64) MillisBehindLatest(string, float64)
@ -23,7 +24,7 @@ type MonitoringService interface {
LeaseRenewed(string) LeaseRenewed(string)
RecordGetRecordsTime(string, float64) RecordGetRecordsTime(string, float64)
RecordProcessRecordsTime(string, float64) RecordProcessRecordsTime(string, float64)
Flush() error Shutdown()
} }
func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error { func (m *MonitoringConfiguration) Init(nameSpace, streamName string, workerID string) error {
@ -57,9 +58,9 @@ func (m *MonitoringConfiguration) GetMonitoringService() MonitoringService {
type noopMonitoringService struct{} type noopMonitoringService struct{}
func (n *noopMonitoringService) Init() error { func (n *noopMonitoringService) Init() error { return nil }
return nil func (n *noopMonitoringService) Start() error { return nil }
} func (n *noopMonitoringService) Shutdown() {}
func (n *noopMonitoringService) IncrRecordsProcessed(shard string, count int) {} func (n *noopMonitoringService) IncrRecordsProcessed(shard string, count int) {}
func (n *noopMonitoringService) IncrBytesProcessed(shard string, count int64) {} func (n *noopMonitoringService) IncrBytesProcessed(shard string, count int64) {}
@ -69,4 +70,3 @@ func (n *noopMonitoringService) LeaseLost(shard string)
func (n *noopMonitoringService) LeaseRenewed(shard string) {} func (n *noopMonitoringService) LeaseRenewed(shard string) {}
func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {} func (n *noopMonitoringService) RecordGetRecordsTime(shard string, time float64) {}
func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {} func (n *noopMonitoringService) RecordProcessRecordsTime(shard string, time float64) {}
func (n *noopMonitoringService) Flush() error { return nil }

View file

@ -8,6 +8,9 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// PrometheusMonitoringService to start Prometheus as metrics system.
// It might be trick if the service onboarding with KCL also uses Prometheus.
// Therefore, we should start cloudwatch metrics by default instead.
type PrometheusMonitoringService struct { type PrometheusMonitoringService struct {
ListenAddress string ListenAddress string
@ -70,17 +73,25 @@ func (p *PrometheusMonitoringService) Init() error {
} }
} }
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Debugf("Starting Prometheus listener on %s", p.ListenAddress)
err := http.ListenAndServe(p.ListenAddress, nil)
if err != nil {
log.Errorln("Error starting Prometheus metrics endpoint", err)
}
}()
return nil return nil
} }
func (p *PrometheusMonitoringService) Start() error {
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Infof("Starting Prometheus listener on %s", p.ListenAddress)
err := http.ListenAndServe(p.ListenAddress, nil)
if err != nil {
log.Errorf("Error starting Prometheus metrics endpoint. %+v", err)
}
log.Info("Stopped metrics server")
}()
return nil
}
func (p *PrometheusMonitoringService) Shutdown() {}
func (p *PrometheusMonitoringService) IncrRecordsProcessed(shard string, count int) { func (p *PrometheusMonitoringService) IncrRecordsProcessed(shard string, count int) {
p.processedRecords.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count)) p.processedRecords.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Add(float64(count))
} }
@ -112,5 +123,3 @@ func (p *PrometheusMonitoringService) RecordGetRecordsTime(shard string, time fl
func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) { func (p *PrometheusMonitoringService) RecordProcessRecordsTime(shard string, time float64) {
p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time) p.processRecordsTime.With(prometheus.Labels{"shard": shard, "kinesisStream": p.KinesisStream}).Observe(time)
} }
func (p *PrometheusMonitoringService) Flush() error { return nil }

View file

@ -209,8 +209,6 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
case <-*sc.stop: case <-*sc.stop:
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput) sc.recordProcessor.Shutdown(shutdownInput)
// flush out the metrics data
sc.mService.Flush()
return nil return nil
case <-time.After(1 * time.Nanosecond): case <-time.After(1 * time.Nanosecond):
} }

View file

@ -106,8 +106,11 @@ func (w *Worker) Start() error {
return err return err
} }
log.Info("Initialization complete. Starting worker event loop.") // Start monitoring service
log.Info("Starting monitoring service.")
w.mService.Start()
log.Info("Starting worker event loop.")
// entering event loop // entering event loop
go w.eventLoop() go w.eventLoop()
return nil return nil
@ -120,6 +123,7 @@ func (w *Worker) Shutdown() {
close(*w.stop) close(*w.stop)
w.waitGroup.Wait() w.waitGroup.Wait()
w.mService.Shutdown()
log.Info("Worker loop is complete. Exiting from worker.") log.Info("Worker loop is complete. Exiting from worker.")
} }
@ -170,6 +174,8 @@ func (w *Worker) initialize() error {
return err return err
} }
log.Info("Initialization complete.")
return nil return nil
} }

View file

@ -92,7 +92,9 @@ func getMetricsConfig(service string) *metrics.MonitoringConfiguration {
MonitoringService: "cloudwatch", MonitoringService: "cloudwatch",
Region: regionName, Region: regionName,
CloudWatch: metrics.CloudWatchMonitoringService{ CloudWatch: metrics.CloudWatchMonitoringService{
ResolutionSec: 1, // Those value should come from kclConfig
MetricsBufferTimeMillis: 10000,
MetricsMaxQueueSize: 20,
}, },
} }
} }