diff --git a/README.md b/README.md index 2f67308..8d35048 100644 --- a/README.md +++ b/README.md @@ -161,16 +161,51 @@ The [expvar package](https://golang.org/pkg/expvar/) will display consumer count ``` ### Logging +Logging supports the basic built-in logging library or use thrid party external one, so long as +it implements the Logger interface. + +For example, to use the builtin logging package, we wrap it with myLogger structure. + +``` +// A myLogger provides a minimalistic logger satisfying the Logger interface. +type myLogger struct { + logger *log.Logger +} + +// Log logs the parameters to the stdlib logger. See log.Println. +func (l *myLogger) Log(args ...interface{}) { + l.logger.Println(args...) +} +``` The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy: ```go // logger -logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags) - +log := &myLogger{ logger : log.New(os.Stdout, "consumer-example: ", log.LstdFlags),} // consumer c, err := consumer.New(streamName, consumer.WithLogger(logger)) ``` +To use a more complicated logging library, e.g. apex log +``` +type myLogger struct { + logger *log.Logger +} + +func (l *myLogger) Log(args ...interface{}) { + l.logger.Infof("producer", args...) + +} + +func main() { + + log := &myLogger{ + logger: alog.Logger{ + Handler: text.New(os.Stderr), + Level: alog.DebugLevel, + }, + } +``` ## Contributing diff --git a/consumer.go b/consumer.go index 7d69c99..75e685a 100644 --- a/consumer.go +++ b/consumer.go @@ -3,8 +3,6 @@ package consumer import ( "context" "fmt" - "io/ioutil" - "log" "sync" "github.com/aws/aws-sdk-go/service/kinesis" @@ -59,7 +57,7 @@ func WithCheckpoint(checkpoint Checkpoint) Option { } // WithLogger overrides the default logger -func WithLogger(logger *log.Logger) Option { +func WithLogger(logger Logger) Option { return func(c *Consumer) error { c.logger = logger return nil @@ -94,7 +92,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { streamName: streamName, checkpoint: &noopCheckpoint{}, counter: &noopCounter{}, - logger: log.New(ioutil.Discard, "", log.LstdFlags), + logger: NewDefaultLogger(), client: NewKinesisClient(), } @@ -112,7 +110,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { type Consumer struct { streamName string client Client - logger *log.Logger + logger Logger checkpoint Checkpoint counter Counter } @@ -170,7 +168,8 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Recor return fmt.Errorf("get checkpoint error: %v", err) } - c.logger.Println("scanning", shardID, lastSeqNum) + c.logger.Log("scanning", shardID, lastSeqNum) + // get records recc, errc, err := c.client.GetRecords(ctx, c.streamName, shardID, lastSeqNum) if err != nil { @@ -200,6 +199,6 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Recor } } - c.logger.Println("exiting", shardID) + c.logger.Log("exiting", shardID) return <-errc } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 4602564..4d1f23d 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -18,6 +18,9 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/kinesis" + alog "github.com/apex/log" + "github.com/apex/log/handlers/text" + consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" ) @@ -26,7 +29,7 @@ import ( func init() { sock, err := net.Listen("tcp", "localhost:8080") if err != nil { - log.Fatalf("net listen error: %v", err) + log.Println("net listen error: %v", err) } go func() { fmt.Println("Metrics available at http://localhost:8080/debug/vars") @@ -34,7 +37,25 @@ func init() { }() } +// A myLogger provides a minimalistic logger satisfying the Logger interface. +type myLogger struct { + logger alog.Logger +} + +// Log logs the parameters to the stdlib logger. See log.Println. +func (l *myLogger) Log(args ...interface{}) { + l.logger.Infof("producer", args...) +} + func main() { + // Wrap myLogger around apex logger + log := &myLogger{ + logger: alog.Logger{ + Handler: text.New(os.Stdout), + Level: alog.DebugLevel, + }, + } + var ( app = flag.String("app", "", "App name") stream = flag.String("stream", "", "Stream name") @@ -52,11 +73,10 @@ func main() { // ddb checkpoint ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{})) if err != nil { - log.Fatalf("checkpoint error: %v", err) + log.Log("checkpoint error: %v", err) } var ( counter = expvar.NewMap("counters") - logger = log.New(os.Stdout, "", log.LstdFlags) ) // The following 2 lines will overwrite the default kinesis client @@ -69,12 +89,12 @@ func main() { c, err := consumer.New( *stream, consumer.WithCheckpoint(ck), - consumer.WithLogger(logger), + consumer.WithLogger(log), consumer.WithCounter(counter), consumer.WithClient(newKclient), ) if err != nil { - log.Fatalf("consumer error: %v", err) + log.Log("consumer error: %v", err) } // use cancel func to signal shutdown @@ -101,11 +121,11 @@ func main() { } }) if err != nil { - log.Fatalf("scan error: %v", err) + log.Log("scan error: %v", err) } if err := ck.Shutdown(); err != nil { - log.Fatalf("checkpoint shutdown error: %v", err) + log.Log("checkpoint shutdown error: %v", err) } } diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..06ebecb --- /dev/null +++ b/logger.go @@ -0,0 +1,30 @@ +package consumer + +import ( + "io/ioutil" + "log" +) + +// A Logger is a minimal interface to as a adaptor for external logging library to consumer +type Logger interface { + Log(...interface{}) +} + +type LoggerFunc func(...interface{}) + +// NewDefaultLogger returns a Logger which discards messages. +func NewDefaultLogger() Logger { + return &defaultLogger{ + logger: log.New(ioutil.Discard, "", log.LstdFlags), + } +} + +// A defaultLogger provides a logging instance when none is provided. +type defaultLogger struct { + logger *log.Logger +} + +// Log using stdlib logger. See log.Println. +func (l defaultLogger) Log(args ...interface{}) { + l.logger.Println(args...) +}