From c91f6233ef3583922734ebceceb3b2602eb0cf7f Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Wed, 22 Nov 2017 14:09:22 -0800 Subject: [PATCH] Add counter for exposing scanner metrics --- README.md | 60 ++++++++++++++++++++++++++------------- consumer.go | 29 +++++++++++++++++-- examples/consumer/main.go | 28 +++++++++++++++--- 3 files changed, 92 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 29b0123..804c168 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Kinesis consumer applications written in Go. This library is intended to be a li __Alternate serverless options:__ -* [Kinesis to Firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) can be used to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application. +* [Kinesis to Firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) can be used to archive data directly to S3, Redshift, or Elasticsearch without running a consumer application. * [Process Kinensis Streams with Golang and AWS Lambda](https://medium.com/@harlow/processing-kinesis-streams-w-aws-lambda-and-golang-264efc8f979a) for serverless processing and checkpoint management. @@ -53,14 +53,14 @@ func main() { log.Fatalf("scan error: %v", err) } - // Note: If you need to aggregate based on a specific shard the `ScanShard` + // Note: If you need to aggregate based on a specific shard the `ScanShard` // method should be leverged instead. } ``` ## Checkpoint -To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. +To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. This will allow consumers to re-launch and pick up at the position in the stream where they left off. @@ -104,37 +104,59 @@ To leverage the DDB checkpoint we'll also need to create a table: ## Options -The consumer allows the following optional overrides: +The consumer allows the following optional overrides. -* Kinesis Client -* Logger +### Client + +Override the Kinesis client if there is any special config needed: ```go -// new kinesis client -svc := kinesis.New(session.New(aws.NewConfig())) +// client +client := kinesis.New(session.New(aws.NewConfig())) -// new consumer with custom client +// consumer c, err := consumer.New( consumer, streamName, - consumer.WithClient(svc), + consumer.WithClient(client), ) ``` -## Logging +### Metrics -The package defaults to `ioutil.Discard` which will silence log output. This can be overridden with the preferred logging strategy: +Add optional counter for exposing counts for checkpoints and records processed: ```go -func main() { - // ... +// counter +counter := expvar.NewMap("counters") - // logger - logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) +// consumer +c, err := consumer.New(checkpoint, appName, streamName, + consumer.WithCounter(counter), +) +``` - // consumer - c, err := consumer.New(checkpoint, appName, streamName, consumer.WithLogger(logger)) -} +The [expvar package](https://golang.org/pkg/expvar/) will display consumer counts: + +``` +"counters": { + "checkpoints": 3, + "records": 13005 +}, +``` + +### Logging + +The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy: + +```go +// logger +logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) + +// consumer +c, err := consumer.New(checkpoint, appName, streamName, + consumer.WithLogger(logger), +) ``` ## Contributing diff --git a/consumer.go b/consumer.go index 7c51a7a..5016067 100644 --- a/consumer.go +++ b/consumer.go @@ -15,6 +15,15 @@ import ( type Record = kinesis.Record +// Counter is used for exposing basic metrics from the scanner +type Counter interface { + Add(string, int64) +} + +type noopCounter struct{} + +func (n noopCounter) Add(string, int64) {} + // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) error @@ -34,6 +43,14 @@ func WithLogger(logger *log.Logger) Option { } } +// WithCounter overrides the default counter +func WithCounter(counter Counter) Option { + return func(c *Consumer) error { + c.counter = counter + return nil + } +} + // New creates a kinesis consumer with default settings. Use Option to override // any of the optional attributes. func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) (*Consumer, error) { @@ -64,7 +81,7 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( // provide default logger if c.logger == nil { - c.logger = log.New(ioutil.Discard, "kinesis-consumer: ", log.LstdFlags) + c.logger = log.New(ioutil.Discard, "", log.LstdFlags) } // provide a default kinesis client @@ -72,6 +89,11 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( c.client = kinesis.New(session.New(aws.NewConfig())) } + // provide default no-op counter + if c.counter == nil { + c.counter = &noopCounter{} + } + return c, nil } @@ -82,6 +104,7 @@ type Consumer struct { client *kinesis.Kinesis logger *log.Logger checkpoint checkpoint.Checkpoint + counter Counter } // Scan scans each of the shards of the stream, calls the callback @@ -168,7 +191,9 @@ loop: } } - c.logger.Println("checkpointing", shardID, len(resp.Records)) + c.counter.Add("records", int64(len(resp.Records))) + c.counter.Add("checkpoints", 1) + if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil { c.logger.Printf("set checkpoint error: %v", err) } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 711b0fa..8f4724b 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -2,15 +2,30 @@ package main import ( "context" + "expvar" "flag" "fmt" "log" + "net" + "net/http" "os" consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" ) +// kick off a server for exposing metrics +func init() { + sock, err := net.Listen("tcp", "localhost:8080") + if err != nil { + log.Fatalf("net listen error: %v", err) + } + go func() { + fmt.Println("Metrics available at http://localhost:8080/debug/vars") + http.Serve(sock, nil) + }() +} + func main() { var ( app = flag.String("app", "", "App name") @@ -18,8 +33,10 @@ func main() { ) flag.Parse() - // logger - logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) + var ( + counter = expvar.NewMap("counters") + logger = log.New(os.Stdout, "consumer-example: ", log.LstdFlags) + ) // checkpoint ck, err := checkpoint.New(*app, *stream) @@ -28,12 +45,15 @@ func main() { } // consumer - c, err := consumer.New(ck, *app, *stream, consumer.WithLogger(logger)) + c, err := consumer.New(ck, *app, *stream, + consumer.WithLogger(logger), + consumer.WithCounter(counter), + ) if err != nil { log.Fatalf("consumer error: %v", err) } - // scan stream + // start scan err = c.Scan(context.TODO(), func(r *consumer.Record) bool { fmt.Println(string(r.Data)) return true // continue scanning