Added stats to base kinesis client
This commit is contained in:
parent
5c373fa7d8
commit
eb230b94f7
4 changed files with 63 additions and 0 deletions
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||
|
||||
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||
)
|
||||
|
||||
|
|
@ -89,6 +90,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
|
|||
for _, line := range e.FailedMessages {
|
||||
b.log.ErrorD("failed-log", kv.M{"log": line})
|
||||
}
|
||||
stats.Counter("batch-log-failures", len(e.FailedMessages))
|
||||
case CatastrophicSendBatchError:
|
||||
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
|
||||
os.Exit(1)
|
||||
|
|
@ -98,6 +100,7 @@ func (b *batcherManager) sendBatch(batcher *batcher, tag string) {
|
|||
}
|
||||
|
||||
batcher.Clear()
|
||||
stats.Counter("batches-sent", 1)
|
||||
}
|
||||
|
||||
func (b *batcherManager) sendCheckpoint(
|
||||
|
|
@ -155,6 +158,7 @@ func (b *batcherManager) startMessageHandler(
|
|||
if !ok {
|
||||
batcher = b.createBatcher()
|
||||
batchers[tmp.tag] = batcher
|
||||
stats.Gauge("tag-count", len(batchers))
|
||||
}
|
||||
|
||||
err := batcher.AddMessage(tmp.msg, tmp.pair)
|
||||
|
|
@ -168,6 +172,7 @@ func (b *batcherManager) startMessageHandler(
|
|||
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag,
|
||||
})
|
||||
}
|
||||
stats.Counter("msg-batched", 1)
|
||||
case pair := <-lastIgnored:
|
||||
lastIgnoredPair = pair
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||
|
||||
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||
)
|
||||
|
||||
|
|
@ -72,6 +73,7 @@ func (cm *checkpointManager) startCheckpointHandler(
|
|||
if !pair.IsEmpty() {
|
||||
checkpointer.Checkpoint(pair)
|
||||
lastCheckpoint = time.Now()
|
||||
stats.Counter("checkpoints-sent", 1)
|
||||
}
|
||||
|
||||
if isShuttingDown {
|
||||
|
|
|
|||
50
batchconsumer/stats/stats.go
Normal file
50
batchconsumer/stats/stats.go
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
package stats
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gopkg.in/Clever/kayvee-go.v6/logger"
|
||||
)
|
||||
|
||||
var log = logger.New("amazon-kinesis-client-go")
|
||||
|
||||
type datum struct {
|
||||
name string
|
||||
value int
|
||||
category string
|
||||
}
|
||||
|
||||
var queue = make(chan datum, 1000)
|
||||
|
||||
func init() {
|
||||
data := map[string]int{}
|
||||
tick := time.Tick(time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case d := <-queue:
|
||||
if d.category == "counter" {
|
||||
data[d.name] = data[d.name] + d.value
|
||||
} else if d.category == "gauge" {
|
||||
data[d.name] = d.value
|
||||
} else {
|
||||
log.ErrorD("unknow-stat-category", logger.M{"category": d.category})
|
||||
}
|
||||
case <-tick:
|
||||
tmp := logger.M{}
|
||||
for k, v := range data {
|
||||
tmp[k] = v
|
||||
}
|
||||
log.InfoD("stats", tmp)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func Counter(name string, val int) {
|
||||
queue <- datum{name, val, "counter"}
|
||||
}
|
||||
|
||||
func Gauge(name string, val int) {
|
||||
queue <- datum{name, val, "gauge"}
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
kv "gopkg.in/Clever/kayvee-go.v6/logger"
|
||||
|
||||
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats"
|
||||
"github.com/Clever/amazon-kinesis-client-go/kcl"
|
||||
"github.com/Clever/amazon-kinesis-client-go/splitter"
|
||||
)
|
||||
|
|
@ -95,17 +96,20 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
|||
if err == ErrMessageIgnored {
|
||||
continue // Skip message
|
||||
} else if err != nil {
|
||||
stats.Counter("unknown-error", 1)
|
||||
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
|
||||
continue // Don't stop processing messages because of one bad message
|
||||
}
|
||||
|
||||
if len(tags) == 0 {
|
||||
stats.Counter("no-tags", 1)
|
||||
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
|
||||
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
|
||||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
if tag == "" {
|
||||
stats.Counter("blank-tag", 1)
|
||||
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
|
||||
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
|
||||
}
|
||||
|
|
@ -124,6 +128,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
|||
b.batcherManager.LatestIgnored(pair)
|
||||
}
|
||||
b.batcherManager.LatestProcessed(pair)
|
||||
|
||||
stats.Counter("processed-messages", len(messages))
|
||||
}
|
||||
b.lastProcessedSeq = pair
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue