From eb230b94f7c7f786f74affc6eefcb2cf09971509 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 7 Aug 2017 03:05:41 +0000 Subject: [PATCH] Added stats to base kinesis client --- batchconsumer/batchermanager.go | 5 +++ batchconsumer/checkpointmanager.go | 2 ++ batchconsumer/stats/stats.go | 50 ++++++++++++++++++++++++++++++ batchconsumer/writer.go | 6 ++++ 4 files changed, 63 insertions(+) create mode 100644 batchconsumer/stats/stats.go diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index bf2b2a2..22ca8cd 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -6,6 +6,7 @@ import ( kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) @@ -89,6 +90,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { for _, line := range e.FailedMessages { b.log.ErrorD("failed-log", kv.M{"log": line}) } + stats.Counter("batch-log-failures", len(e.FailedMessages)) case CatastrophicSendBatchError: b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) os.Exit(1) @@ -98,6 +100,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) { } batcher.Clear() + stats.Counter("batches-sent", 1) } func (b *batcherManager) sendCheckpoint( @@ -155,6 +158,7 @@ func (b *batcherManager) startMessageHandler( if !ok { batcher = b.createBatcher() batchers[tmp.tag] = batcher + stats.Gauge("tag-count", len(batchers)) } err := batcher.AddMessage(tmp.msg, tmp.pair) @@ -168,6 +172,7 @@ func (b *batcherManager) startMessageHandler( "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, }) } + stats.Counter("msg-batched", 1) case pair := <-lastIgnored: lastIgnoredPair = pair diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index eb755b6..ec8a9dd 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -5,6 +5,7 @@ import ( kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" ) @@ -72,6 +73,7 @@ func (cm *checkpointManager) startCheckpointHandler( if !pair.IsEmpty() { checkpointer.Checkpoint(pair) lastCheckpoint = time.Now() + stats.Counter("checkpoints-sent", 1) } if isShuttingDown { diff --git a/batchconsumer/stats/stats.go b/batchconsumer/stats/stats.go new file mode 100644 index 0000000..75eaaf9 --- /dev/null +++ b/batchconsumer/stats/stats.go @@ -0,0 +1,50 @@ +package stats + +import ( + "time" + + "gopkg.in/Clever/kayvee-go.v6/logger" +) + +var log = logger.New("amazon-kinesis-client-go") + +type datum struct { + name string + value int + category string +} + +var queue = make(chan datum, 1000) + +func init() { + data := map[string]int{} + tick := time.Tick(time.Minute) + go func() { + for { + select { + case d := <-queue: + if d.category == "counter" { + data[d.name] = data[d.name] + d.value + } else if d.category == "gauge" { + data[d.name] = d.value + } else { + log.ErrorD("unknow-stat-category", logger.M{"category": d.category}) + } + case <-tick: + tmp := logger.M{} + for k, v := range data { + tmp[k] = v + } + log.InfoD("stats", tmp) + } + } + }() +} + +func Counter(name string, val int) { + queue <- datum{name, val, "counter"} +} + +func Gauge(name string, val int) { + queue <- datum{name, val, "gauge"} +} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 519eff7..8035b9f 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -9,6 +9,7 @@ import ( "golang.org/x/time/rate" kv "gopkg.in/Clever/kayvee-go.v6/logger" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/splitter" ) @@ -95,17 +96,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { if err == ErrMessageIgnored { continue // Skip message } else if err != nil { + stats.Counter("unknown-error", 1) b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { + stats.Counter("no-tags", 1) b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { + stats.Counter("blank-tag", 1) b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -124,6 +128,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { b.batcherManager.LatestIgnored(pair) } b.batcherManager.LatestProcessed(pair) + + stats.Counter("processed-messages", len(messages)) } b.lastProcessedSeq = pair