Use stdlib logging, default to discard
This commit is contained in:
parent
9a35af8df6
commit
90d2903fe6
3 changed files with 35 additions and 54 deletions
37
README.md
37
README.md
|
|
@ -24,9 +24,6 @@ import(
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetHandler(text.New(os.Stderr))
|
|
||||||
log.SetLevel(log.DebugLevel)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
app = flag.String("app", "", "App name")
|
app = flag.String("app", "", "App name")
|
||||||
stream = flag.String("stream", "", "Stream name")
|
stream = flag.String("stream", "", "Stream name")
|
||||||
|
|
@ -59,7 +56,7 @@ func main() {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Checkpoint
|
## Checkpoint
|
||||||
|
|
||||||
To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard.
|
To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard.
|
||||||
|
|
||||||
|
|
@ -71,7 +68,7 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]`
|
||||||
|
|
||||||
There are currently two storage types for checkpoints:
|
There are currently two storage types for checkpoints:
|
||||||
|
|
||||||
### Redis
|
### Redis Checkpoint
|
||||||
|
|
||||||
The Redis checkpoint requries App Name, and Stream Name:
|
The Redis checkpoint requries App Name, and Stream Name:
|
||||||
|
|
||||||
|
|
@ -85,7 +82,7 @@ if err != nil {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### DynamoDB
|
### DynamoDB Checkpoint
|
||||||
|
|
||||||
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
|
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
|
||||||
|
|
||||||
|
|
@ -103,7 +100,7 @@ To leverage the DDB checkpoint we'll also need to create a table:
|
||||||
|
|
||||||
<img width="659" alt="screen shot 2017-11-20 at 9 16 14 am" src="https://user-images.githubusercontent.com/739782/33033316-db85f848-cdd8-11e7-941a-0a87d8ace479.png">
|
<img width="659" alt="screen shot 2017-11-20 at 9 16 14 am" src="https://user-images.githubusercontent.com/739782/33033316-db85f848-cdd8-11e7-941a-0a87d8ace479.png">
|
||||||
|
|
||||||
### Options
|
## Options
|
||||||
|
|
||||||
The consumer allows the following optional overrides:
|
The consumer allows the following optional overrides:
|
||||||
|
|
||||||
|
|
@ -122,32 +119,22 @@ c, err := consumer.New(
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Logging
|
## Logging
|
||||||
|
|
||||||
[Apex Log](https://medium.com/@tjholowaychuk/apex-log-e8d9627f4a9a#.5x1uo1767) is used for logging Info. Override the logs format with other [Log Handlers](https://github.com/apex/log/tree/master/_examples). For example using the "json" log handler:
|
The package defaults to `ioutil.Discard` so swallow all logs. This can be customized with the preferred logging strategy:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import(
|
|
||||||
"github.com/apex/log"
|
|
||||||
"github.com/apex/log/handlers/json"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// ...
|
// ...
|
||||||
|
|
||||||
log.SetHandler(json.New(os.Stderr))
|
// logger
|
||||||
log.SetLevel(log.DebugLevel)
|
logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags)
|
||||||
|
|
||||||
|
// consumer
|
||||||
|
c, err := consumer.New(checkpoint, appName, streamName, consumer.WithLogger(logger))
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Which will producde the following logs:
|
|
||||||
|
|
||||||
```
|
|
||||||
INFO[0000] processing app=test shard=shardId-000000000000 stream=test
|
|
||||||
INFO[0008] checkpoint app=test shard=shardId-000000000000 stream=test
|
|
||||||
INFO[0012] checkpoint app=test shard=shardId-000000000000 stream=test
|
|
||||||
```
|
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
|
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
|
||||||
|
|
|
||||||
37
consumer.go
37
consumer.go
|
|
@ -3,9 +3,10 @@ package consumer
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/apex/log"
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
|
|
@ -26,7 +27,7 @@ func WithCheckpoint(checkpoint checkpoint.Checkpoint) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger overrides the default logger
|
// WithLogger overrides the default logger
|
||||||
func WithLogger(logger log.Interface) Option {
|
func WithLogger(logger *log.Logger) Option {
|
||||||
return func(c *Consumer) error {
|
return func(c *Consumer) error {
|
||||||
c.logger = logger
|
c.logger = logger
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -50,7 +51,7 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) (
|
||||||
|
|
||||||
c := &Consumer{
|
c := &Consumer{
|
||||||
checkpoint: checkpoint,
|
checkpoint: checkpoint,
|
||||||
appName: app,
|
appName: app,
|
||||||
streamName: stream,
|
streamName: stream,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,11 +64,7 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) (
|
||||||
|
|
||||||
// provide default logger
|
// provide default logger
|
||||||
if c.logger == nil {
|
if c.logger == nil {
|
||||||
c.logger = log.Log.WithFields(log.Fields{
|
c.logger = log.New(ioutil.Discard, "kinesis-consumer: ", log.LstdFlags)
|
||||||
"package": "kinesis-consumer",
|
|
||||||
"app": app,
|
|
||||||
"stream": stream,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// provide a default kinesis client
|
// provide a default kinesis client
|
||||||
|
|
@ -80,10 +77,10 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) (
|
||||||
|
|
||||||
// Consumer wraps the interaction with the Kinesis stream
|
// Consumer wraps the interaction with the Kinesis stream
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
appName string
|
appName string
|
||||||
streamName string
|
streamName string
|
||||||
client *kinesis.Kinesis
|
client *kinesis.Kinesis
|
||||||
logger log.Interface
|
logger *log.Logger
|
||||||
checkpoint checkpoint.Checkpoint
|
checkpoint checkpoint.Checkpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -123,21 +120,19 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*kinesis.Record) bool) erro
|
||||||
// for each record and checkpoints after each page is processed.
|
// for each record and checkpoints after each page is processed.
|
||||||
// Note: returning `false` from the callback func will end the scan.
|
// Note: returning `false` from the callback func will end the scan.
|
||||||
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) {
|
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*kinesis.Record) bool) {
|
||||||
var logger = c.logger.WithFields(log.Fields{"shard": shardID})
|
|
||||||
|
|
||||||
lastSeqNum, err := c.checkpoint.Get(shardID)
|
lastSeqNum, err := c.checkpoint.Get(shardID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("get checkpoint")
|
c.logger.Printf("get checkpoint error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator, err := c.getShardIterator(shardID, lastSeqNum)
|
shardIterator, err := c.getShardIterator(shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("getShardIterator")
|
c.logger.Printf("get shard iterator error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("scanning shard")
|
c.logger.Println("scanning", shardID)
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
|
|
@ -154,7 +149,7 @@ loop:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shardIterator, err = c.getShardIterator(shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("getShardIterator")
|
c.logger.Printf("get shard iterator error: %v", err)
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
@ -173,16 +168,16 @@ loop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.WithField("count", len(resp.Records)).Info("checkpoint")
|
c.logger.Println("checkpointing", shardID, len(resp.Records))
|
||||||
if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil {
|
if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil {
|
||||||
c.logger.WithError(err).Error("set checkpoint error")
|
c.logger.Printf("set checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator {
|
if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator {
|
||||||
shardIterator, err = c.getShardIterator(shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("getShardIterator")
|
c.logger.Printf("get shard iterator error: %v", err)
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -195,8 +190,9 @@ loop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.logger.Println("checkpointing", shardID)
|
||||||
if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil {
|
if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil {
|
||||||
c.logger.WithError(err).Error("set checkpoint error")
|
c.logger.Printf("set checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -215,7 +211,6 @@ func (c *Consumer) getShardIterator(shardID, lastSeqNum string) (*string, error)
|
||||||
|
|
||||||
resp, err := c.client.GetShardIterator(params)
|
resp, err := c.client.GetShardIterator(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.WithError(err).Error("GetShardIterator")
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,32 +4,31 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/apex/log"
|
|
||||||
"github.com/apex/log/handlers/text"
|
|
||||||
consumer "github.com/harlow/kinesis-consumer"
|
consumer "github.com/harlow/kinesis-consumer"
|
||||||
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
|
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetHandler(text.New(os.Stderr))
|
|
||||||
log.SetLevel(log.DebugLevel)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
app = flag.String("app", "", "App name")
|
app = flag.String("app", "", "App name")
|
||||||
stream = flag.String("stream", "", "Stream name")
|
stream = flag.String("stream", "", "Stream name")
|
||||||
)
|
)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// new checkpoint
|
// logger
|
||||||
|
logger := log.New(os.Stdout, "consumer-example: ", log.LstdFlags)
|
||||||
|
|
||||||
|
// checkpoint
|
||||||
ck, err := checkpoint.New(*app, *stream)
|
ck, err := checkpoint.New(*app, *stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("checkpoint error: %v", err)
|
log.Fatalf("checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// new consumer
|
// consumer
|
||||||
c, err := consumer.New(ck, *app, *stream)
|
c, err := consumer.New(ck, *app, *stream, consumer.WithLogger(logger))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("consumer error: %v", err)
|
log.Fatalf("consumer error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue