rename checkpoint dir to store

This commit is contained in:
Harlow Ward 2019-07-28 21:13:48 -07:00
parent c12e801990
commit 80009ef620
27 changed files with 41 additions and 46 deletions

View file

@ -136,7 +136,7 @@ To persist scan progress choose one of the following storage layers:
The Redis checkpoint requries App Name, and Stream Name: The Redis checkpoint requries App Name, and Stream Name:
```go ```go
import storage "github.com/harlow/kinesis-consumer/checkpoint/redis" import storage "github.com/harlow/kinesis-consumer/store/redis"
// redis checkpoint // redis checkpoint
db, err := storage.New(appName) db, err := storage.New(appName)
@ -150,7 +150,7 @@ if err != nil {
The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
```go ```go
import storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" import storage "github.com/harlow/kinesis-consumer/store/ddb"
// ddb checkpoint // ddb checkpoint
db, err := storage.New(appName, tableName) db, err := storage.New(appName, tableName)
@ -191,7 +191,7 @@ Sort key: shard_id
The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:
```go ```go
import storage "github.com/harlow/kinesis-consumer/checkpoint/postgres" import storage "github.com/harlow/kinesis-consumer/store/postgres"
// postgres checkpoint // postgres checkpoint
db, err := storage.New(app, table, connStr) db, err := storage.New(app, table, connStr)
@ -219,7 +219,7 @@ The table name has to be the same that you specify when creating the checkpoint.
The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):
```go ```go
import storage "github.com/harlow/kinesis-consumer/checkpoint/mysql" import storage "github.com/harlow/kinesis-consumer/store/mysql"
// mysql checkpoint // mysql checkpoint
db, err := storage.New(app, table, connStr) db, err := storage.New(app, table, connStr)

View file

@ -11,13 +11,13 @@ import (
// NewAllGroup returns an intitialized AllGroup for consuming // NewAllGroup returns an intitialized AllGroup for consuming
// all shards on a stream // all shards on a stream
func NewAllGroup(ksis kinesisiface.KinesisAPI, db Storage, streamName string, logger Logger) *AllGroup { func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger Logger) *AllGroup {
return &AllGroup{ return &AllGroup{
ksis: ksis, ksis: ksis,
shards: make(map[string]*kinesis.Shard), shards: make(map[string]*kinesis.Shard),
streamName: streamName, streamName: streamName,
logger: logger, logger: logger,
storage: db, Store: store,
} }
} }
@ -27,7 +27,7 @@ type AllGroup struct {
ksis kinesisiface.KinesisAPI ksis kinesisiface.KinesisAPI
streamName string streamName string
logger Logger logger Logger
storage Storage Store
shardMu sync.Mutex shardMu sync.Mutex
shards map[string]*kinesis.Shard shards map[string]*kinesis.Shard
@ -59,16 +59,6 @@ func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) {
} }
} }
// GetCheckpoint returns the current checkpoint for provided stream
func (g *AllGroup) GetCheckpoint(streamName, shardID string) (string, error) {
return g.storage.GetCheckpoint(streamName, shardID)
}
// SetCheckpoint sets the current checkpoint for provided stream
func (g *AllGroup) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
return g.storage.SetCheckpoint(streamName, shardID, sequenceNumber)
}
// findNewShards pulls the list of shards from the Kinesis API // findNewShards pulls the list of shards from the Kinesis API
// and uses a local cache to determine if we are already processing // and uses a local cache to determine if we are already processing
// a particular shard. // a particular shard.

View file

@ -27,7 +27,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
c := &Consumer{ c := &Consumer{
streamName: streamName, streamName: streamName,
initialShardIteratorType: kinesis.ShardIteratorTypeLatest, initialShardIteratorType: kinesis.ShardIteratorTypeLatest,
storage: &noopStorage{}, store: &noopStore{},
counter: &noopCounter{}, counter: &noopCounter{},
logger: &noopLogger{ logger: &noopLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: log.New(ioutil.Discard, "", log.LstdFlags),
@ -50,7 +50,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
// default group consumes all shards // default group consumes all shards
if c.group == nil { if c.group == nil {
c.group = NewAllGroup(c.client, c.storage, streamName, c.logger) c.group = NewAllGroup(c.client, c.store, streamName, c.logger)
} }
return c, nil return c, nil
@ -64,7 +64,7 @@ type Consumer struct {
counter Counter counter Counter
group Group group Group
logger Logger logger Logger
storage Storage store Store
} }
// ScanFunc is the type of the function called for each message read // ScanFunc is the type of the function called for each message read

View file

@ -7,9 +7,8 @@ Read records from the Kinesis stream
Export the required environment vars for connecting to the Kinesis stream: Export the required environment vars for connecting to the Kinesis stream:
``` ```
export AWS_ACCESS_KEY= export AWS_PROFILE=
export AWS_REGION= export AWS_REGION=
export AWS_SECRET_KEY=
``` ```
### Run the consumer ### Run the consumer

View file

@ -19,7 +19,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" storage "github.com/harlow/kinesis-consumer/store/ddb"
) )
// kick off a server for exposing scan metrics // kick off a server for exposing scan metrics

View file

@ -7,9 +7,8 @@ Read records from the Kinesis stream using mysql as checkpoint
Export the required environment vars for connecting to the Kinesis stream: Export the required environment vars for connecting to the Kinesis stream:
```shell ```shell
export AWS_ACCESS_KEY= export AWS_PROFILE=
export AWS_REGION= export AWS_REGION=
export AWS_SECRET_KEY=
``` ```
## Run the consumer ## Run the consumer
@ -18,4 +17,4 @@ export AWS_SECRET_KEY=
Connection string should look something like Connection string should look something like
user:password@/dbname user:password@/dbname

View file

@ -10,7 +10,7 @@ import (
"os/signal" "os/signal"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" checkpoint "github.com/harlow/kinesis-consumer/store/mysql"
) )
func main() { func main() {

View file

@ -7,9 +7,8 @@ Read records from the Kinesis stream using postgres as checkpoint
Export the required environment vars for connecting to the Kinesis stream: Export the required environment vars for connecting to the Kinesis stream:
```shell ```shell
export AWS_ACCESS_KEY= export AWS_PROFILE=
export AWS_REGION= export AWS_REGION=
export AWS_SECRET_KEY=
``` ```
## Run the consumer ## Run the consumer

View file

@ -10,7 +10,7 @@ import (
"os/signal" "os/signal"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" checkpoint "github.com/harlow/kinesis-consumer/store/postgres"
) )
func main() { func main() {

View file

@ -9,7 +9,7 @@ import (
"os/signal" "os/signal"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" checkpoint "github.com/harlow/kinesis-consumer/store/redis"
) )
// A myLogger provides a minimalistic logger satisfying the Logger interface. // A myLogger provides a minimalistic logger satisfying the Logger interface.

1
go.mod
View file

@ -7,6 +7,7 @@ require (
github.com/go-sql-driver/mysql v1.4.1 github.com/go-sql-driver/mysql v1.4.1
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 github.com/lib/pq v0.0.0-20180523175426-90697d60dd84
github.com/pkg/errors v0.8.0 github.com/pkg/errors v0.8.0
github.com/stretchr/testify v1.3.0 // indirect
google.golang.org/appengine v1.6.1 // indirect google.golang.org/appengine v1.6.1 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
gopkg.in/redis.v5 v5.2.9 gopkg.in/redis.v5 v5.2.9

7
go.sum
View file

@ -2,6 +2,8 @@ github.com/apex/log v1.0.0 h1:5UWeZC54mWVtOGSCjtuvDPgY/o0QxmjQgvYZ27pLVGQ=
github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY= github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY=
github.com/aws/aws-sdk-go v1.15.0 h1:uxi9gcf4jxEX7r8oWYMEkYB4kziKet+1cHPmq52LjC4= github.com/aws/aws-sdk-go v1.15.0 h1:uxi9gcf4jxEX7r8oWYMEkYB4kziKet+1cHPmq52LjC4=
github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY= github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY=
github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
@ -14,6 +16,11 @@ github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5Wu
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=

View file

@ -6,9 +6,9 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
type Option func(*Consumer) type Option func(*Consumer)
// WithStorage overrides the default storage // WithStorage overrides the default storage
func WithStorage(storage Storage) Option { func WithStorage(store Store) Option {
return func(c *Consumer) { return func(c *Consumer) {
c.storage = storage c.store = store
} }
} }

View file

@ -1,13 +0,0 @@
package consumer
// Storage interface used to persist scan progress
type Storage interface {
GetCheckpoint(streamName, shardID string) (string, error)
SetCheckpoint(streamName, shardID, sequenceNumber string) error
}
// noopStorage implements the storage interface with discard
type noopStorage struct{}
func (n noopStorage) GetCheckpoint(string, string) (string, error) { return "", nil }
func (n noopStorage) SetCheckpoint(string, string, string) error { return nil }

13
store.go Normal file
View file

@ -0,0 +1,13 @@
package consumer
// Store interface used to persist scan progress
type Store interface {
GetCheckpoint(streamName, shardID string) (string, error)
SetCheckpoint(streamName, shardID, sequenceNumber string) error
}
// noopStore implements the storage interface with discard
type noopStore struct{}
func (n noopStore) GetCheckpoint(string, string) (string, error) { return "", nil }
func (n noopStore) SetCheckpoint(string, string, string) error { return nil }