From 3770136f6476d8cd519e50f6b5ff4214ddf53ef4 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Wed, 22 Nov 2017 17:44:42 -0800 Subject: [PATCH] Allow user to override no-op checkpoint with Option --- README.md | 20 +++++++++----------- checkpoint/checkpoint.go | 8 -------- consumer.go | 32 +++++++++++++++++--------------- examples/consumer/main.go | 3 ++- 4 files changed, 28 insertions(+), 35 deletions(-) delete mode 100644 checkpoint/checkpoint.go diff --git a/README.md b/README.md index 804c168..0a25773 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,13 @@ Get the package source: The consumer leverages a handler func that accepts a Kinesis record. The `Scan` method will consume all shards concurrently and call the callback func as it receives records from the stream. +Important: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults. + ```go import( // ... + consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" ) func main() { @@ -32,14 +34,8 @@ func main() { ) flag.Parse() - // new checkpoint - ck, err := checkpoint.New(*app, *stream) - if err != nil { - log.Fatalf("checkpoint error: %v", err) - } - - // new consumer - c, err := consumer.New(ck, *app, *stream) + // consumer + c, err := consumer.New(*app, *stream) if err != nil { log.Fatalf("consumer error: %v", err) } @@ -68,7 +64,9 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]` kinesis-checkpoints -There are currently two storage types for checkpoints: +Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. + +To persist scan progress choose one of the following checkpoints: ### Redis Checkpoint @@ -116,7 +114,7 @@ client := kinesis.New(session.New(aws.NewConfig())) // consumer c, err := consumer.New( - consumer, + appName, streamName, consumer.WithClient(client), ) diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go deleted file mode 100644 index 89af444..0000000 --- a/checkpoint/checkpoint.go +++ /dev/null @@ -1,8 +0,0 @@ -package checkpoint - -// Checkpoint interface used to allow swappable backends for checkpoining -// consumer progress in the stream. -type Checkpoint interface { - Get(shardID string) (string, error) - Set(shardID string, sequenceNumber string) error -} diff --git a/consumer.go b/consumer.go index fce6d30..06c2219 100644 --- a/consumer.go +++ b/consumer.go @@ -10,12 +10,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/harlow/kinesis-consumer/checkpoint" ) type Record = kinesis.Record -// Counter is used for exposing basic metrics from the scanner +// Counter interface is used for exposing basic metrics from the scanner type Counter interface { Add(string, int64) } @@ -24,11 +23,22 @@ type noopCounter struct{} func (n noopCounter) Add(string, int64) {} +// Checkpoint interface used track consumer progress in the stream +type Checkpoint interface { + Get(shardID string) (string, error) + Set(shardID string, sequenceNumber string) error +} + +type noopCheckpoint struct{} + +func (n noopCheckpoint) Set(string, string) error { return nil } +func (n noopCheckpoint) Get(string) (string, error) { return "", nil } + // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) error // WithCheckpoint overrides the default checkpoint -func WithCheckpoint(checkpoint checkpoint.Checkpoint) Option { +func WithCheckpoint(checkpoint Checkpoint) Option { return func(c *Consumer) error { c.checkpoint = checkpoint return nil @@ -53,11 +63,7 @@ func WithCounter(counter Counter) Option { // New creates a kinesis consumer with default settings. Use Option to override // any of the optional attributes. -func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) (*Consumer, error) { - if checkpoint == nil { - return nil, fmt.Errorf("must provide checkpoint") - } - +func New(app, stream string, opts ...Option) (*Consumer, error) { if app == "" { return nil, fmt.Errorf("must provide app name") } @@ -67,9 +73,10 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( } c := &Consumer{ - checkpoint: checkpoint, appName: app, streamName: stream, + checkpoint: &noopCheckpoint{}, + counter: &noopCounter{}, } // set options @@ -89,11 +96,6 @@ func New(checkpoint checkpoint.Checkpoint, app, stream string, opts ...Option) ( c.client = kinesis.New(session.New(aws.NewConfig())) } - // provide default no-op counter - if c.counter == nil { - c.counter = &noopCounter{} - } - return c, nil } @@ -103,7 +105,7 @@ type Consumer struct { streamName string client *kinesis.Kinesis logger *log.Logger - checkpoint checkpoint.Checkpoint + checkpoint Checkpoint counter Counter } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 8f4724b..46c6282 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -45,7 +45,8 @@ func main() { } // consumer - c, err := consumer.New(ck, *app, *stream, + c, err := consumer.New(*app, *stream, + consumer.WithCheckpoint(ck), consumer.WithLogger(logger), consumer.WithCounter(counter), )