diff --git a/consumer.go b/consumer.go index 06c2219..fcc5f89 100644 --- a/consumer.go +++ b/consumer.go @@ -3,8 +3,8 @@ package consumer import ( "context" "fmt" - "io/ioutil" "log" + "os" "sync" "github.com/aws/aws-sdk-go/aws" @@ -31,7 +31,7 @@ type Checkpoint interface { type noopCheckpoint struct{} -func (n noopCheckpoint) Set(string, string) error { return nil } +func (n noopCheckpoint) Set(string, string) error { return nil } func (n noopCheckpoint) Get(string) (string, error) { return "", nil } // Option is used to override defaults when creating a new Consumer @@ -76,7 +76,8 @@ func New(app, stream string, opts ...Option) (*Consumer, error) { appName: app, streamName: stream, checkpoint: &noopCheckpoint{}, - counter: &noopCounter{}, + counter: &noopCounter{}, + logger: log.New(os.Stderr, "kinesis-consumer: ", log.LstdFlags), } // set options @@ -86,11 +87,6 @@ func New(app, stream string, opts ...Option) (*Consumer, error) { } } - // provide default logger - if c.logger == nil { - c.logger = log.New(ioutil.Discard, "", log.LstdFlags) - } - // provide a default kinesis client if c.client == nil { c.client = kinesis.New(session.New(aws.NewConfig())) @@ -197,6 +193,8 @@ loop: if err := c.checkpoint.Set(shardID, lastSeqNum); err != nil { c.logger.Printf("set checkpoint error: %v", err) } + + c.logger.Println("checkpoint", shardID, len(resp.Records)) c.counter.Add("checkpoints", 1) } diff --git a/examples/consumer/README.md b/examples/consumer/README.md index 39de3b9..f946797 100644 --- a/examples/consumer/README.md +++ b/examples/consumer/README.md @@ -14,4 +14,4 @@ export AWS_SECRET_KEY= ### Run the consumer - $ go run main.go -a appName -s streamName + $ go run main.go --app appName --stream streamName diff --git a/examples/producer/README.md b/examples/producer/README.md index 3c8e027..87d1606 100644 --- a/examples/producer/README.md +++ b/examples/producer/README.md @@ -17,4 +17,4 @@ export AWS_SECRET_KEY= ### Running the code $ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt - $ go run main.go -s streamName + $ go run main.go --stream streamName