From b7be26418a4d8d9627c49ca0865a604a167d4d94 Mon Sep 17 00:00:00 2001 From: Emanuel Ramos <40024962+imaramos@users.noreply.github.com> Date: Mon, 18 Jun 2018 03:27:10 +0100 Subject: [PATCH] Add postgres checkpoint implementation (#55) --- .gitignore | 4 + Gopkg.lock | 11 +- README.md | 28 ++++ checkpoint/postgres/postgres.go | 154 ++++++++++++++++++++ examples/consumer/{ => cp-dynamo}/README.md | 0 examples/consumer/{ => cp-dynamo}/main.go | 0 examples/consumer/cp-postgres/README.md | 17 +++ examples/consumer/cp-postgres/main.go | 80 ++++++++++ 8 files changed, 293 insertions(+), 1 deletion(-) create mode 100644 checkpoint/postgres/postgres.go rename examples/consumer/{ => cp-dynamo}/README.md (100%) rename examples/consumer/{ => cp-dynamo}/main.go (100%) create mode 100644 examples/consumer/cp-postgres/README.md create mode 100644 examples/consumer/cp-postgres/main.go diff --git a/.gitignore b/.gitignore index 3a49a48..fc5ded3 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,7 @@ vendor/** # Benchmark files prof.cpu prof.mem + +# VSCode files +/.vscode +/**/debug diff --git a/Gopkg.lock b/Gopkg.lock index 523c43d..3ddad40 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -60,6 +60,15 @@ packages = ["."] revision = "0b12d6b5" +[[projects]] + branch = "master" + name = "github.com/lib/pq" + packages = [ + ".", + "oid" + ] + revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8" + [[projects]] name = "github.com/pkg/errors" packages = ["."] @@ -82,6 +91,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "1b40486b645b81bc2215d0153631e9e002e534ba86713ba55500ce62c07cbad8" + inputs-digest = "82ad275b394f4727b9d9ad816f534e512b1b007806a4b785d8a742e5836eee48" solver-name = "gps-cdcl" solver-version = 1 diff --git a/README.md b/README.md index 8d35048..ad43f69 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,34 @@ Sort key: shard_id screen shot 2017-11-22 at 7 59 36 pm +### Postgres Checkpoint + +The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: + +```go +import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" + +// postgres checkpoint +ck, err := checkpoint.New(app, table, connStr) +if err != nil { + log.Fatalf("new checkpoint error: %v", err) +} + +``` + +To leverage the Postgres checkpoint we'll also need to create a table: + +```sql +CREATE TABLE kinesis_consumer ( + namespace text NOT NULL, + shard_id text NOT NULL, + sequence_number numeric NOT NULL, + CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id) +); +``` + +The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. + ## Options The consumer allows the following optional overrides. diff --git a/checkpoint/postgres/postgres.go b/checkpoint/postgres/postgres.go new file mode 100644 index 0000000..b45d673 --- /dev/null +++ b/checkpoint/postgres/postgres.go @@ -0,0 +1,154 @@ +package postgres + +import ( + "database/sql" + "errors" + "fmt" + "sync" + "time" + // this is the postgres package so it makes sense to be here + _ "github.com/lib/pq" +) + +var getCheckpointQuery = `SELECT sequence_number + FROM %s + WHERE namespace=$1 AND shard_id=$2` + +var upsertCheckpoint = `INSERT INTO %s (namespace, shard_id, sequence_number) + VALUES($1, $2, $3) + ON CONFLICT (namespace, shard_id) + DO + UPDATE + SET sequence_number= $3` + +type key struct { + streamName string + shardID string +} + +// Option is used to override defaults when creating a new Checkpoint +type Option func(*Checkpoint) + +// Checkpoint stores and retreives the last evaluated key from a DDB scan +type Checkpoint struct { + appName string + conn *sql.DB + mu *sync.Mutex // protects the checkpoints + done chan struct{} + checkpoints map[key]string + maxInterval time.Duration +} + +// New returns a checkpoint that uses PostgresDB for underlying storage +// Using connectionStr turn it more flexible to use specific db configs +func New(appName, tableName, connectionStr string, opts ...Option) (*Checkpoint, error) { + + if tableName == "" { + return nil, errors.New("Table name not defined") + } + + conn, err := sql.Open("postgres", connectionStr) + + if err != nil { + return nil, err + } + + getCheckpointQuery = fmt.Sprintf(getCheckpointQuery, tableName) + upsertCheckpoint = fmt.Sprintf(upsertCheckpoint, tableName) + + ck := &Checkpoint{ + conn: conn, + appName: appName, + done: make(chan struct{}), + maxInterval: time.Duration(1 * time.Minute), + mu: new(sync.Mutex), + checkpoints: map[key]string{}, + } + + for _, opt := range opts { + opt(ck) + } + + go ck.loop() + + return ck, nil +} + +// Get determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *Checkpoint) Get(streamName, shardID string) (string, error) { + namespace := fmt.Sprintf("%s-%s", c.appName, streamName) + + var sequenceNumber string + + err := c.conn.QueryRow(getCheckpointQuery, namespace, shardID).Scan(&sequenceNumber) + + if err != nil { + if err == sql.ErrNoRows { + return "", nil + } + + return "", err + } + + return sequenceNumber, nil +} + +// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { + c.mu.Lock() + defer c.mu.Unlock() + + if sequenceNumber == "" { + return fmt.Errorf("sequence number should not be empty") + } + + key := key{ + streamName: streamName, + shardID: shardID, + } + + c.checkpoints[key] = sequenceNumber + + return nil +} + +// Shutdown the checkpoint. Save any in-flight data. +func (c *Checkpoint) Shutdown() error { + defer c.conn.Close() + + c.done <- struct{}{} + + return c.save() +} + +func (c *Checkpoint) loop() { + tick := time.NewTicker(c.maxInterval) + defer tick.Stop() + defer close(c.done) + + for { + select { + case <-tick.C: + c.save() + case <-c.done: + return + } + } +} + +func (c *Checkpoint) save() error { + c.mu.Lock() + defer c.mu.Unlock() + + for key, sequenceNumber := range c.checkpoints { + + if _, err := c.conn.Exec(upsertCheckpoint, fmt.Sprintf("%s-%s", c.appName, key.streamName), key.shardID, sequenceNumber); err != nil { + return err + } + } + + return nil +} diff --git a/examples/consumer/README.md b/examples/consumer/cp-dynamo/README.md similarity index 100% rename from examples/consumer/README.md rename to examples/consumer/cp-dynamo/README.md diff --git a/examples/consumer/main.go b/examples/consumer/cp-dynamo/main.go similarity index 100% rename from examples/consumer/main.go rename to examples/consumer/cp-dynamo/main.go diff --git a/examples/consumer/cp-postgres/README.md b/examples/consumer/cp-postgres/README.md new file mode 100644 index 0000000..f49e5d0 --- /dev/null +++ b/examples/consumer/cp-postgres/README.md @@ -0,0 +1,17 @@ +# Consumer with postgres checkpoint + +Read records from the Kinesis stream using postgres as checkpoint + +## Environment Variables + +Export the required environment vars for connecting to the Kinesis stream: + +```shell +export AWS_ACCESS_KEY= +export AWS_REGION= +export AWS_SECRET_KEY= +``` + +## Run the consumer + + go run main.go --app appName --stream streamName --table tableName --connection connectionString diff --git a/examples/consumer/cp-postgres/main.go b/examples/consumer/cp-postgres/main.go new file mode 100644 index 0000000..3a6aec7 --- /dev/null +++ b/examples/consumer/cp-postgres/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "errors" + "expvar" + "flag" + "fmt" + "log" + "os" + "os/signal" + + consumer "github.com/harlow/kinesis-consumer" + checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" +) + +func main() { + var ( + app = flag.String("app", "", "App name") + stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Table name") + connStr = flag.String("connection", "", "Connection Str") + ) + flag.Parse() + + // postgres checkpoint + ck, err := checkpoint.New(*app, *table, *connStr) + if err != nil { + log.Fatalf("checkpoint error: %v", err) + } + + var ( + counter = expvar.NewMap("counters") + ) + + newKclient := consumer.NewKinesisClient() + + // consumer + c, err := consumer.New( + *stream, + consumer.WithCheckpoint(ck), + consumer.WithCounter(counter), + consumer.WithClient(newKclient), + ) + if err != nil { + log.Fatalf("consumer error: %v", err) + } + + // use cancel \func to signal shutdown + ctx, cancel := context.WithCancel(context.Background()) + + // trap SIGINT, wait to trigger shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + go func() { + <-signals + cancel() + }() + + // scan stream + err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanError { + fmt.Println(string(r.Data)) + err := errors.New("some error happened") + // continue scanning + return consumer.ScanError{ + Error: err, + StopScan: false, + SkipCheckpoint: false, + } + }) + + if err != nil { + log.Fatalf("scan error: %v", err) + } + + if err := ck.Shutdown(); err != nil { + log.Fatalf("checkpoint shutdown error: %v", err) + } +}