give example of setting storage
This commit is contained in:
parent
70e2468665
commit
c12e801990
1 changed files with 10 additions and 1 deletions
11
README.md
11
README.md
|
|
@ -110,7 +110,7 @@ The consumer allows the following optional overrides.
|
|||
|
||||
### Storage
|
||||
|
||||
To record the progress (checkpoint) of the consumer in the stream we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
|
||||
To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
|
||||
|
||||
This will allow consumers to re-launch and pick up at the position in the stream where they left off.
|
||||
|
||||
|
|
@ -120,6 +120,15 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]`
|
|||
|
||||
Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.
|
||||
|
||||
The consumer accpets a `WithStorage` option to set the storage layer:
|
||||
|
||||
```go
|
||||
c, err := consumer.New(*stream, consumer.WithStorage(db))
|
||||
if err != nil {
|
||||
log.Log("consumer error: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
To persist scan progress choose one of the following storage layers:
|
||||
|
||||
#### Redis
|
||||
|
|
|
|||
Loading…
Reference in a new issue