From 49b5a94c7e7da78a23b00e7840c81aff8eed166a Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 30 Apr 2016 22:23:35 -0700 Subject: [PATCH] Use Apex log for logging (#27) * Use Apex log for logging --- README.md | 15 ++++++++++++--- consumer.go | 40 ++++++++++++++++++++++++++-------------- logger.go | 16 ---------------- 3 files changed, 38 insertions(+), 33 deletions(-) delete mode 100644 logger.go diff --git a/README.md b/README.md index 2787075..05ee7d9 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,19 @@ Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/e Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). Applications can override the default loging behaviour by implementing the [Logger interface][log_interface]. ```go -connector.SetLogger(NewCustomLogger()) -``` +import( + "os" -[log_interface]: https://github.com/harlow/kinesis-connectors/blob/master/logger.go + "github.com/apex/log" + "github.com/apex/log/handlers/json" +) + +func main() { + c := connector.NewConsumer("signupAgg", "signups") + c.SetLogHandler(json.New(os.Stderr)) + // ... +} +``` ## Contributing diff --git a/consumer.go b/consumer.go index 5491daa..6fb8ec4 100644 --- a/consumer.go +++ b/consumer.go @@ -3,8 +3,9 @@ package connector import ( "os" + "github.com/apex/log" + "github.com/apex/log/handlers/text" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" ) @@ -16,10 +17,13 @@ var ( // NewConsumer creates a new kinesis connection and returns a // new consumer initialized with app and stream name func NewConsumer(appName, streamName string) *Consumer { - sess := session.New( - aws.NewConfig().WithMaxRetries(10), + log.SetHandler(text.New(os.Stderr)) + + svc := kinesis.New( + session.New( + aws.NewConfig().WithMaxRetries(10), + ), ) - svc := kinesis.New(sess) return &Consumer{ appName: appName, @@ -40,11 +44,16 @@ func (c *Consumer) Set(option string, value interface{}) { case "maxBatchCount": maxBatchCount = value.(int) default: - logger.Log("fatal", "Set", "msg", "unknown option") + log.Error("invalid option") os.Exit(1) } } +// SetLogHandler allows users override logger +func (c *Consumer) SetLogHandler(handler log.Handler) { + log.SetHandler(handler) +} + // Start takes a handler and then loops over each of the shards // processing each one with the handler. func (c *Consumer) Start(handler Handler) { @@ -55,17 +64,22 @@ func (c *Consumer) Start(handler Handler) { ) if err != nil { - logger.Log("fatal", "DescribeStream", "msg", err.Error()) + log.WithError(err).Error("DescribeStream") os.Exit(1) } for _, shard := range resp.StreamDescription.Shards { - logger.Log("info", "processing", "stream", c.streamName, "shard", shard.ShardId) go c.handlerLoop(*shard.ShardId, handler) } } func (c *Consumer) handlerLoop(shardID string, handler Handler) { + ctx := log.WithFields(log.Fields{ + "app": c.appName, + "stream": c.streamName, + "shard": shardID, + }) + buf := &Buffer{ MaxBatchCount: maxBatchCount, } @@ -89,13 +103,12 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { resp, err := c.svc.GetShardIterator(params) if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) - os.Exit(1) - } + ctx.WithError(err).Error("getShardIterator") + os.Exit(1) } shardIterator := resp.ShardIterator + ctx.Info("started") for { resp, err := c.svc.GetRecords( @@ -105,8 +118,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { ) if err != nil { - awsErr, _ := err.(awserr.Error) - logger.Log("fatal", "getRecords", awsErr.Code()) + ctx.WithError(err).Error("getRecords") os.Exit(1) } @@ -121,7 +133,7 @@ func (c *Consumer) handlerLoop(shardID string, handler Handler) { } } } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { - logger.Log("fatal", "nextShardIterator", "msg", err.Error()) + ctx.Error("nextShardIterator") os.Exit(1) } diff --git a/logger.go b/logger.go deleted file mode 100644 index 9286db3..0000000 --- a/logger.go +++ /dev/null @@ -1,16 +0,0 @@ -package connector - -import ( - "os" - - "github.com/go-kit/kit/log" -) - -// SetLogger adds the ability to change the logger so that external packages -// can control the logging for this package -func SetLogger(l log.Logger) { - logger = l -} - -// specify a default logger so that we don't end up with panics. -var logger log.Logger = log.NewLogfmtLogger(os.Stderr)