From 70e24686651031583d3be3c9f21ab1f62d1188b3 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 28 Jul 2019 11:17:27 -0700 Subject: [PATCH] Update readme w/ storage --- README.md | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index d19830e..b9b7331 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Build Status](https://travis-ci.com/harlow/kinesis-consumer.svg?branch=master)](https://travis-ci.com/harlow/kinesis-consumer) [![GoDoc](https://godoc.org/github.com/harlow/kinesis-consumer?status.svg)](https://godoc.org/github.com/harlow/kinesis-consumer) -__Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months. +__Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months. Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2 @@ -104,9 +104,13 @@ err := c.Scan(ctx, func(r *consumer.Record) error { }) ``` -## Checkpoint +## Options -To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. +The consumer allows the following optional overrides. + +### Storage + +To record the progress (checkpoint) of the consumer in the stream we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. This will allow consumers to re-launch and pick up at the position in the stream where they left off. @@ -114,33 +118,33 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]` kinesis-checkpoints -Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. +Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. -To persist scan progress choose one of the following checkpoints: +To persist scan progress choose one of the following storage layers: -### Redis Checkpoint +#### Redis The Redis checkpoint requries App Name, and Stream Name: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" +import storage "github.com/harlow/kinesis-consumer/checkpoint/redis" // redis checkpoint -ck, err := checkpoint.New(appName) +db, err := storage.New(appName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } ``` -### DynamoDB Checkpoint +#### DynamoDB The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" +import storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" // ddb checkpoint -ck, err := checkpoint.New(appName, tableName) +db, err := storage.New(appName, tableName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -154,7 +158,7 @@ myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) // Region: aws.String("us-west-2"), // }) -ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) +db, err := storage.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -173,15 +177,15 @@ Sort key: shard_id screen shot 2017-11-22 at 7 59 36 pm -### Postgres Checkpoint +#### Postgres The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" +import storage "github.com/harlow/kinesis-consumer/checkpoint/postgres" // postgres checkpoint -ck, err := checkpoint.New(app, table, connStr) +db, err := storage.New(app, table, connStr) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -201,15 +205,15 @@ CREATE TABLE kinesis_consumer ( The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. -### Mysql Checkpoint +#### Mysql The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" +import storage "github.com/harlow/kinesis-consumer/checkpoint/mysql" // mysql checkpoint -ck, err := checkpoint.New(app, table, connStr) +db, err := storage.New(app, table, connStr) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -229,10 +233,6 @@ CREATE TABLE kinesis_consumer ( The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. -## Options - -The consumer allows the following optional overrides. - ### Kinesis Client Override the Kinesis client if there is any special config needed: