Update readme w/ storage

This commit is contained in:
Harlow Ward 2019-07-28 11:17:27 -07:00
parent 8b19674b4a
commit 70e2468665

View file

@ -2,7 +2,7 @@
[![Build Status](https://travis-ci.com/harlow/kinesis-consumer.svg?branch=master)](https://travis-ci.com/harlow/kinesis-consumer) [![GoDoc](https://godoc.org/github.com/harlow/kinesis-consumer?status.svg)](https://godoc.org/github.com/harlow/kinesis-consumer) [![Build Status](https://travis-ci.com/harlow/kinesis-consumer.svg?branch=master)](https://travis-ci.com/harlow/kinesis-consumer) [![GoDoc](https://godoc.org/github.com/harlow/kinesis-consumer?status.svg)](https://godoc.org/github.com/harlow/kinesis-consumer)
__Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months. __Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months.
Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2 Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2
@ -104,9 +104,13 @@ err := c.Scan(ctx, func(r *consumer.Record) error {
}) })
``` ```
## Checkpoint ## Options
To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. The consumer allows the following optional overrides.
### Storage
To record the progress (checkpoint) of the consumer in the stream we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
This will allow consumers to re-launch and pick up at the position in the stream where they left off. This will allow consumers to re-launch and pick up at the position in the stream where they left off.
@ -114,33 +118,33 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]`
<img width="722" alt="kinesis-checkpoints" src="https://user-images.githubusercontent.com/739782/33085867-d8336122-ce9a-11e7-8c8a-a8afeb09dff1.png"> <img width="722" alt="kinesis-checkpoints" src="https://user-images.githubusercontent.com/739782/33085867-d8336122-ce9a-11e7-8c8a-a8afeb09dff1.png">
Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.
To persist scan progress choose one of the following checkpoints: To persist scan progress choose one of the following storage layers:
### Redis Checkpoint #### Redis
The Redis checkpoint requries App Name, and Stream Name: The Redis checkpoint requries App Name, and Stream Name:
```go ```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" import storage "github.com/harlow/kinesis-consumer/checkpoint/redis"
// redis checkpoint // redis checkpoint
ck, err := checkpoint.New(appName) db, err := storage.New(appName)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
``` ```
### DynamoDB Checkpoint #### DynamoDB
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
```go ```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" import storage "github.com/harlow/kinesis-consumer/checkpoint/ddb"
// ddb checkpoint // ddb checkpoint
ck, err := checkpoint.New(appName, tableName) db, err := storage.New(appName, tableName)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
@ -154,7 +158,7 @@ myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))
// Region: aws.String("us-west-2"), // Region: aws.String("us-west-2"),
// }) // })
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) db, err := storage.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
@ -173,15 +177,15 @@ Sort key: shard_id
<img width="727" alt="screen shot 2017-11-22 at 7 59 36 pm" src="https://user-images.githubusercontent.com/739782/33158557-b90e4228-cfbf-11e7-9a99-73b56a446f5f.png"> <img width="727" alt="screen shot 2017-11-22 at 7 59 36 pm" src="https://user-images.githubusercontent.com/739782/33158557-b90e4228-cfbf-11e7-9a99-73b56a446f5f.png">
### Postgres Checkpoint #### Postgres
The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:
```go ```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" import storage "github.com/harlow/kinesis-consumer/checkpoint/postgres"
// postgres checkpoint // postgres checkpoint
ck, err := checkpoint.New(app, table, connStr) db, err := storage.New(app, table, connStr)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
@ -201,15 +205,15 @@ CREATE TABLE kinesis_consumer (
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
### Mysql Checkpoint #### Mysql
The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):
```go ```go
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" import storage "github.com/harlow/kinesis-consumer/checkpoint/mysql"
// mysql checkpoint // mysql checkpoint
ck, err := checkpoint.New(app, table, connStr) db, err := storage.New(app, table, connStr)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }
@ -229,10 +233,6 @@ CREATE TABLE kinesis_consumer (
The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.
## Options
The consumer allows the following optional overrides.
### Kinesis Client ### Kinesis Client
Override the Kinesis client if there is any special config needed: Override the Kinesis client if there is any special config needed: