From 90d2903fe676dcd1468eb2c0f736c26c3a507f12 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Wed, 22 Nov 2017 10:46:39 -0800 Subject: [PATCH] Use stdlib logging, default to discard --- README.md | 37 ++++++++++++------------------------- consumer.go | 37 ++++++++++++++++--------------------- examples/consumer/main.go | 15 +++++++-------- 3 files changed, 35 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index b22e29c..d27d533 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,6 @@ import( ) func main() { - log.SetHandler(text.New(os.Stderr)) - log.SetLevel(log.DebugLevel) - var ( app = flag.String("app", "", "App name") stream = flag.String("stream", "", "Stream name") @@ -59,7 +56,7 @@ func main() { } ``` -### Checkpoint +## Checkpoint To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. @@ -71,7 +68,7 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]` There are currently two storage types for checkpoints: -### Redis +### Redis Checkpoint The Redis checkpoint requries App Name, and Stream Name: @@ -85,7 +82,7 @@ if err != nil { } ``` -### DynamoDB +### DynamoDB Checkpoint The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: @@ -103,7 +100,7 @@ To leverage the DDB checkpoint we'll also need to create a table: screen shot 2017-11-20 at 9 16 14 am -### Options +## Options The consumer allows the following optional overrides: @@ -122,32 +119,22 @@ c, err := consumer.New( ) ``` -### Logging +## Logging -[Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used for logging Info. Override the logs format with other [Log Handlers](https://github.com/apex/log/tree/master/_examples). For example using the "json" log handler: +The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy: ```go -import( - "github.com/apex/log" - "github.com/apex/log/handlers/json" -) - func main() { - // ... + // ... - log.SetHandler(json.New(os.Stderr)) - log.SetLevel(log.DebugLevel) + // logger + logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) + + // consumer + c, err := consumer.New(checkpoint, appName, streamName, consumer.WithLogger(logger)) } ``` -Which will producde the following logs: - -``` - INFO[0000] processing app=test shard=shardId-000000000000 stream=test - INFO[0008] checkpoint app=test shard=shardId-000000000000 stream=test - INFO[0012] checkpoint app=test shard=shardId-000000000000 stream=test -``` - ## Contributing Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]! diff --git a/consumer.go b/consumer.go index 13f9114..7c51a7a 100644 --- a/consumer.go +++ b/consumer.go @@ -3,9 +3,10 @@ package consumer import ( "context" "fmt" + "io/ioutil" + "log" "sync" - "github.com/apex/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -26,7 +27,7 @@ func WithCheckpoint(checkpoint checkpoint.Checkpoint) Option { } // WithLogger overrides the default logger -func WithLogger(logger log.Interface) Option { +func WithLogger(logger *log.Logger) Option { return func(c *Consumer) error { c.logger = logger return nil @@ -50,7 +51,7 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( c := &Consumer{ checkpoint: checkpoint, - appName: app, + appName: app, streamName: stream, } @@ -63,11 +64,7 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( // provide default logger if c.logger == nil { - c.logger = log.Log.WithFields(log.Fields{ - "package": "kinesis-consumer", - "app": app, - "stream": stream, - }) + c.logger = log.New(ioutil.Discard, "kinesis-consumer: ", log.LstdFlags) } // provide a default kinesis client @@ -80,10 +77,10 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( // Consumer wraps the interaction with the Kinesis stream type Consumer struct { - appName string + appName string streamName string client *kinesis.Kinesis - logger log.Interface + logger *log.Logger checkpoint checkpoint.Checkpoint } @@ -123,21 +120,19 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro // for each record and checkpoints after each page is processed. // Note: returning `false` from the callback func will end the scan. func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) { - var logger = c.logger.WithFields(log.Fields{"shard": shardID}) - lastSeqNum, err := c.checkpoint.Get(shardID) if err != nil { - logger.WithError(err).Error("get checkpoint") + c.logger.Printf("get checkpoint error: %v", err) return } shardIterator, err := c.getShardIterator(shardID, lastSeqNum) if err != nil { - logger.WithError(err).Error("getShardIterator") + c.logger.Printf("get shard iterator error: %v", err) return } - logger.Info("scanning shard") + c.logger.Println("scanning", shardID) loop: for { @@ -154,7 +149,7 @@ loop: if err != nil { shardIterator, err = c.getShardIterator(shardID, lastSeqNum) if err != nil { - logger.WithError(err).Error("getShardIterator") + c.logger.Printf("get shard iterator error: %v", err) break loop } continue @@ -173,16 +168,16 @@ loop: } } - logger.WithField("count", len(resp.Records)).Info("checkpoint") + c.logger.Println("checkpointing", shardID, len(resp.Records)) if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil { - c.logger.WithError(err).Error("set checkpoint error") + c.logger.Printf("set checkpoint error: %v", err) } } if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator { shardIterator, err = c.getShardIterator(shardID, lastSeqNum) if err != nil { - logger.WithError(err).Error("getShardIterator") + c.logger.Printf("get shard iterator error: %v", err) break loop } } else { @@ -195,8 +190,9 @@ loop: return } + c.logger.Println("checkpointing", shardID) if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil { - c.logger.WithError(err).Error("set checkpoint error") + c.logger.Printf("set checkpoint error: %v", err) } } @@ -215,7 +211,6 @@ func (c *Consumer) getShardIterator(shardID, lastSeqNum string) (*string, error) resp, err := c.client.GetShardIterator(params) if err != nil { - c.logger.WithError(err).Error("GetShardIterator") return nil, err } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 6613feb..711b0fa 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -4,32 +4,31 @@ import ( "context" "flag" "fmt" + "log" "os" - "github.com/apex/log" - "github.com/apex/log/handlers/text" consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" ) func main() { - log.SetHandler(text.New(os.Stderr)) - log.SetLevel(log.DebugLevel) - var ( app = flag.String("app", "", "App name") stream = flag.String("stream", "", "Stream name") ) flag.Parse() - // new checkpoint + // logger + logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) + + // checkpoint ck, err := checkpoint.New(*app, *stream) if err != nil { log.Fatalf("checkpoint error: %v", err) } - // new consumer - c, err := consumer.New(ck, *app, *stream) + // consumer + c, err := consumer.New(ck, *app, *stream, consumer.WithLogger(logger)) if err != nil { log.Fatalf("consumer error: %v", err) }