Make it possible to let user use 3rd party logging library (#56)

This commit is contained in:
Prometheus 2018-06-12 18:07:33 -07:00 committed by Harlow Ward
parent e6a489c76b
commit 739e9e39a5
4 changed files with 100 additions and 16 deletions

View file

@ -161,16 +161,51 @@ The [expvar package](https://golang.org/pkg/expvar/) will display consumer count
``` ```
### Logging ### Logging
Logging supports the basic built-in logging library or use thrid party external one, so long as
it implements the Logger interface.
For example, to use the builtin logging package, we wrap it with myLogger structure.
```
// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
logger *log.Logger
}
// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
l.logger.Println(args...)
}
```
The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy: The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy:
```go ```go
// logger // logger
logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) log := &myLogger{ logger : log.New(os.Stdout, "consumer-example: ", log.LstdFlags),}
// consumer // consumer
c, err := consumer.New(streamName, consumer.WithLogger(logger)) c, err := consumer.New(streamName, consumer.WithLogger(logger))
``` ```
To use a more complicated logging library, e.g. apex log
```
type myLogger struct {
logger *log.Logger
}
func (l *myLogger) Log(args ...interface{}) {
l.logger.Infof("producer", args...)
}
func main() {
log := &myLogger{
logger: alog.Logger{
Handler: text.New(os.Stderr),
Level: alog.DebugLevel,
},
}
```
## Contributing ## Contributing

View file

@ -3,8 +3,6 @@ package consumer
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"log"
"sync" "sync"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -59,7 +57,7 @@ func WithCheckpoint(checkpoint Checkpoint) Option {
} }
// WithLogger overrides the default logger // WithLogger overrides the default logger
func WithLogger(logger *log.Logger) Option { func WithLogger(logger Logger) Option {
return func(c *Consumer) error { return func(c *Consumer) error {
c.logger = logger c.logger = logger
return nil return nil
@ -94,7 +92,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
streamName: streamName, streamName: streamName,
checkpoint: &noopCheckpoint{}, checkpoint: &noopCheckpoint{},
counter: &noopCounter{}, counter: &noopCounter{},
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: NewDefaultLogger(),
client: NewKinesisClient(), client: NewKinesisClient(),
} }
@ -112,7 +110,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
type Consumer struct { type Consumer struct {
streamName string streamName string
client Client client Client
logger *log.Logger logger Logger
checkpoint Checkpoint checkpoint Checkpoint
counter Counter counter Counter
} }
@ -170,7 +168,8 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Recor
return fmt.Errorf("get checkpoint error: %v", err) return fmt.Errorf("get checkpoint error: %v", err)
} }
c.logger.Println("scanning", shardID, lastSeqNum) c.logger.Log("scanning", shardID, lastSeqNum)
// get records // get records
recc, errc, err := c.client.GetRecords(ctx, c.streamName, shardID, lastSeqNum) recc, errc, err := c.client.GetRecords(ctx, c.streamName, shardID, lastSeqNum)
if err != nil { if err != nil {
@ -200,6 +199,6 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Recor
} }
} }
c.logger.Println("exiting", shardID) c.logger.Log("exiting", shardID)
return <-errc return <-errc
} }

View file

@ -18,6 +18,9 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
alog "github.com/apex/log"
"github.com/apex/log/handlers/text"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
) )
@ -26,7 +29,7 @@ import (
func init() { func init() {
sock, err := net.Listen("tcp", "localhost:8080") sock, err := net.Listen("tcp", "localhost:8080")
if err != nil { if err != nil {
log.Fatalf("net listen error: %v", err) log.Println("net listen error: %v", err)
} }
go func() { go func() {
fmt.Println("Metrics available at http://localhost:8080/debug/vars") fmt.Println("Metrics available at http://localhost:8080/debug/vars")
@ -34,7 +37,25 @@ func init() {
}() }()
} }
// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
logger alog.Logger
}
// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
l.logger.Infof("producer", args...)
}
func main() { func main() {
// Wrap myLogger around apex logger
log := &myLogger{
logger: alog.Logger{
Handler: text.New(os.Stdout),
Level: alog.DebugLevel,
},
}
var ( var (
app = flag.String("app", "", "App name") app = flag.String("app", "", "App name")
stream = flag.String("stream", "", "Stream name") stream = flag.String("stream", "", "Stream name")
@ -52,11 +73,10 @@ func main() {
// ddb checkpoint // ddb checkpoint
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{})) ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Log("checkpoint error: %v", err)
} }
var ( var (
counter = expvar.NewMap("counters") counter = expvar.NewMap("counters")
logger = log.New(os.Stdout, "", log.LstdFlags)
) )
// The following 2 lines will overwrite the default kinesis client // The following 2 lines will overwrite the default kinesis client
@ -69,12 +89,12 @@ func main() {
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
consumer.WithCheckpoint(ck), consumer.WithCheckpoint(ck),
consumer.WithLogger(logger), consumer.WithLogger(log),
consumer.WithCounter(counter), consumer.WithCounter(counter),
consumer.WithClient(newKclient), consumer.WithClient(newKclient),
) )
if err != nil { if err != nil {
log.Fatalf("consumer error: %v", err) log.Log("consumer error: %v", err)
} }
// use cancel func to signal shutdown // use cancel func to signal shutdown
@ -101,11 +121,11 @@ func main() {
} }
}) })
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) log.Log("scan error: %v", err)
} }
if err := ck.Shutdown(); err != nil { if err := ck.Shutdown(); err != nil {
log.Fatalf("checkpoint shutdown error: %v", err) log.Log("checkpoint shutdown error: %v", err)
} }
} }

30
logger.go Normal file
View file

@ -0,0 +1,30 @@
package consumer
import (
"io/ioutil"
"log"
)
// A Logger is a minimal interface to as a adaptor for external logging library to consumer
type Logger interface {
Log(...interface{})
}
type LoggerFunc func(...interface{})
// NewDefaultLogger returns a Logger which discards messages.
func NewDefaultLogger() Logger {
return &defaultLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags),
}
}
// A defaultLogger provides a logging instance when none is provided.
type defaultLogger struct {
logger *log.Logger
}
// Log using stdlib logger. See log.Println.
func (l defaultLogger) Log(args ...interface{}) {
l.logger.Println(args...)
}