diff --git a/README.md b/README.md index d19830e..1c25662 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Build Status](https://travis-ci.com/harlow/kinesis-consumer.svg?branch=master)](https://travis-ci.com/harlow/kinesis-consumer) [![GoDoc](https://godoc.org/github.com/harlow/kinesis-consumer?status.svg)](https://godoc.org/github.com/harlow/kinesis-consumer) -__Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months. +__Note:__ This repo is under active development adding [Consumer Groups #42](https://github.com/harlow/kinesis-consumer/issues/42). Master should always be deployable, but expect breaking changes in master over the next few months. Latest stable release https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.2 @@ -104,9 +104,13 @@ err := c.Scan(ctx, func(r *consumer.Record) error { }) ``` -## Checkpoint +## Options -To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. +The consumer allows the following optional overrides. + +### Storage + +To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback. This will allow consumers to re-launch and pick up at the position in the stream where they left off. @@ -114,33 +118,42 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]` kinesis-checkpoints -Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. +Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started. -To persist scan progress choose one of the following checkpoints: +The consumer accpets a `WithStorage` option to set the storage layer: -### Redis Checkpoint +```go +c, err := consumer.New(*stream, consumer.WithStorage(db)) +if err != nil { + log.Log("consumer error: %v", err) +} +``` + +To persist scan progress choose one of the following storage layers: + +#### Redis The Redis checkpoint requries App Name, and Stream Name: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" +import storage "github.com/harlow/kinesis-consumer/store/redis" // redis checkpoint -ck, err := checkpoint.New(appName) +db, err := storage.New(appName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } ``` -### DynamoDB Checkpoint +#### DynamoDB The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" +import storage "github.com/harlow/kinesis-consumer/store/ddb" // ddb checkpoint -ck, err := checkpoint.New(appName, tableName) +db, err := storage.New(appName, tableName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -154,7 +167,7 @@ myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig())) // Region: aws.String("us-west-2"), // }) -ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) +db, err := storage.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -173,15 +186,15 @@ Sort key: shard_id screen shot 2017-11-22 at 7 59 36 pm -### Postgres Checkpoint +#### Postgres The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString: ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" +import storage "github.com/harlow/kinesis-consumer/store/postgres" // postgres checkpoint -ck, err := checkpoint.New(app, table, connStr) +db, err := storage.New(app, table, connStr) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -201,15 +214,15 @@ CREATE TABLE kinesis_consumer ( The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. -### Mysql Checkpoint +#### Mysql The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!): ```go -import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" +import storage "github.com/harlow/kinesis-consumer/store/mysql" // mysql checkpoint -ck, err := checkpoint.New(app, table, connStr) +db, err := storage.New(app, table, connStr) if err != nil { log.Fatalf("new checkpoint error: %v", err) } @@ -229,10 +242,6 @@ CREATE TABLE kinesis_consumer ( The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity. -## Options - -The consumer allows the following optional overrides. - ### Kinesis Client Override the Kinesis client if there is any special config needed: diff --git a/allgroup.go b/allgroup.go index f046f1d..9aee0bd 100644 --- a/allgroup.go +++ b/allgroup.go @@ -11,13 +11,13 @@ import ( // NewAllGroup returns an intitialized AllGroup for consuming // all shards on a stream -func NewAllGroup(ksis kinesisiface.KinesisAPI, ck Checkpoint, streamName string, logger Logger) *AllGroup { +func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger Logger) *AllGroup { return &AllGroup{ ksis: ksis, shards: make(map[string]*kinesis.Shard), streamName: streamName, logger: logger, - checkpoint: ck, + Store: store, } } @@ -27,7 +27,7 @@ type AllGroup struct { ksis kinesisiface.KinesisAPI streamName string logger Logger - checkpoint Checkpoint + Store shardMu sync.Mutex shards map[string]*kinesis.Shard @@ -59,16 +59,6 @@ func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) { } } -// GetCheckpoint returns the current checkpoint for provided stream -func (g *AllGroup) GetCheckpoint(streamName, shardID string) (string, error) { - return g.checkpoint.Get(streamName, shardID) -} - -// SetCheckpoint sets the current checkpoint for provided stream -func (g *AllGroup) SetCheckpoint(streamName, shardID, sequenceNumber string) error { - return g.checkpoint.Set(streamName, shardID, sequenceNumber) -} - // findNewShards pulls the list of shards from the Kinesis API // and uses a local cache to determine if we are already processing // a particular shard. diff --git a/checkpoint.go b/checkpoint.go deleted file mode 100644 index 383d4c1..0000000 --- a/checkpoint.go +++ /dev/null @@ -1,13 +0,0 @@ -package consumer - -// Checkpoint interface used track consumer progress in the stream -type Checkpoint interface { - Get(streamName, shardID string) (string, error) - Set(streamName, shardID, sequenceNumber string) error -} - -// noopCheckpoint implements the checkpoint interface with discard -type noopCheckpoint struct{} - -func (n noopCheckpoint) Set(string, string, string) error { return nil } -func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil } diff --git a/consumer.go b/consumer.go index d651020..86f311c 100644 --- a/consumer.go +++ b/consumer.go @@ -23,12 +23,12 @@ func New(streamName string, opts ...Option) (*Consumer, error) { return nil, fmt.Errorf("must provide stream name") } - // new consumer with no-op checkpoint, counter, and logger + // new consumer with noop storage, counter, and logger c := &Consumer{ streamName: streamName, initialShardIteratorType: kinesis.ShardIteratorTypeLatest, + store: &noopStore{}, counter: &noopCounter{}, - checkpoint: &noopCheckpoint{}, logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, @@ -39,7 +39,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { opt(c) } - // default client if none provided + // default client if c.client == nil { newSession, err := session.NewSession(aws.NewConfig()) if err != nil { @@ -48,9 +48,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) { c.client = kinesis.New(newSession) } - // default group if none provided + // default group consumes all shards if c.group == nil { - c.group = NewAllGroup(c.client, c.checkpoint, c.streamName, c.logger) + c.group = NewAllGroup(c.client, c.store, streamName, c.logger) } return c, nil @@ -61,10 +61,10 @@ type Consumer struct { streamName string initialShardIteratorType string client kinesisiface.KinesisAPI - logger Logger - group Group - checkpoint Checkpoint counter Counter + group Group + logger Logger + store Store } // ScanFunc is the type of the function called for each message read diff --git a/consumer_test.go b/consumer_test.go index caaa04b..3151cec 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -57,7 +57,7 @@ func TestScan(t *testing.T) { c, err := New("myStreamName", WithClient(client), WithCounter(ctr), - WithCheckpoint(cp), + WithStorage(cp), ) if err != nil { t.Fatalf("new consumer error: %v", err) @@ -90,7 +90,7 @@ func TestScan(t *testing.T) { t.Errorf("counter error expected %d, got %d", 2, val) } - val, err := cp.Get("myStreamName", "myShard") + val, err := cp.GetCheckpoint("myStreamName", "myShard") if err != nil && val != "lastSeqNum" { t.Errorf("checkout error expected %s, got %s", "lastSeqNum", val) } @@ -119,7 +119,7 @@ func TestScanShard(t *testing.T) { c, err := New("myStreamName", WithClient(client), WithCounter(ctr), - WithCheckpoint(cp), + WithStorage(cp), ) if err != nil { t.Fatalf("new consumer error: %v", err) @@ -156,7 +156,7 @@ func TestScanShard(t *testing.T) { } // sets checkpoint - val, err := cp.Get("myStreamName", "myShard") + val, err := cp.GetCheckpoint("myStreamName", "myShard") if err != nil && val != "lastSeqNum" { t.Fatalf("checkout error expected %s, got %s", "lastSeqNum", val) } @@ -219,7 +219,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) { var cp = &fakeCheckpoint{cache: map[string]string{}} - c, err := New("myStreamName", WithClient(client), WithCheckpoint(cp)) + c, err := New("myStreamName", WithClient(client), WithStorage(cp)) if err != nil { t.Fatalf("new consumer error: %v", err) } @@ -229,7 +229,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) { var fn = func(r *Record) error { if aws.StringValue(r.SequenceNumber) == "lastSeqNum" { cancel() - return SkipCheckpoint + return ErrSkipCheckpoint } return nil @@ -240,7 +240,7 @@ func TestScanShard_SkipCheckpoint(t *testing.T) { t.Fatalf("scan shard error: %v", err) } - val, err := cp.Get("myStreamName", "myShard") + val, err := cp.GetCheckpoint("myStreamName", "myShard") if err != nil && val != "firstSeqNum" { t.Fatalf("checkout error expected %s, got %s", "firstSeqNum", val) } @@ -301,7 +301,7 @@ type fakeCheckpoint struct { mu sync.Mutex } -func (fc *fakeCheckpoint) Set(streamName, shardID, sequenceNumber string) error { +func (fc *fakeCheckpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { fc.mu.Lock() defer fc.mu.Unlock() @@ -310,7 +310,7 @@ func (fc *fakeCheckpoint) Set(streamName, shardID, sequenceNumber string) error return nil } -func (fc *fakeCheckpoint) Get(streamName, shardID string) (string, error) { +func (fc *fakeCheckpoint) GetCheckpoint(streamName, shardID string) (string, error) { fc.mu.Lock() defer fc.mu.Unlock() diff --git a/examples/consumer/cp-dynamo/README.md b/examples/consumer/cp-dynamo/README.md index 0337371..ea7e634 100644 --- a/examples/consumer/cp-dynamo/README.md +++ b/examples/consumer/cp-dynamo/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream Export the required environment vars for connecting to the Kinesis stream: ``` -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ### Run the consumer diff --git a/examples/consumer/cp-dynamo/main.go b/examples/consumer/cp-dynamo/main.go index af885b3..d74c816 100644 --- a/examples/consumer/cp-dynamo/main.go +++ b/examples/consumer/cp-dynamo/main.go @@ -19,7 +19,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" + storage "github.com/harlow/kinesis-consumer/store/ddb" ) // kick off a server for exposing scan metrics @@ -69,8 +69,8 @@ func main() { myKsis := kinesis.New(sess) myDdbClient := dynamodb.New(sess) - // ddb checkpoint - ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDdbClient), checkpoint.WithRetryer(&MyRetryer{})) + // ddb persitance + ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{})) if err != nil { log.Log("checkpoint error: %v", err) } @@ -81,7 +81,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithCheckpoint(ck), + consumer.WithStorage(ddb), consumer.WithLogger(log), consumer.WithCounter(counter), consumer.WithClient(myKsis), @@ -111,17 +111,17 @@ func main() { log.Log("scan error: %v", err) } - if err := ck.Shutdown(); err != nil { - log.Log("checkpoint shutdown error: %v", err) + if err := ddb.Shutdown(); err != nil { + log.Log("storage shutdown error: %v", err) } } -// MyRetryer used for checkpointing +// MyRetryer used for storage type MyRetryer struct { - checkpoint.Retryer + storage.Retryer } -// ShouldRetry implements custom logic for when a checkpont should retry +// ShouldRetry implements custom logic for when errors should retry func (r *MyRetryer) ShouldRetry(err error) bool { if awsErr, ok := err.(awserr.Error); ok { switch awsErr.Code() { diff --git a/examples/consumer/cp-mysql/README.md b/examples/consumer/cp-mysql/README.md index dd3b317..5a7690c 100644 --- a/examples/consumer/cp-mysql/README.md +++ b/examples/consumer/cp-mysql/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream using mysql as checkpoint Export the required environment vars for connecting to the Kinesis stream: ```shell -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ## Run the consumer @@ -18,4 +17,4 @@ export AWS_SECRET_KEY= Connection string should look something like - user:password@/dbname \ No newline at end of file + user:password@/dbname diff --git a/examples/consumer/cp-mysql/main.go b/examples/consumer/cp-mysql/main.go index 386ea94..349f190 100644 --- a/examples/consumer/cp-mysql/main.go +++ b/examples/consumer/cp-mysql/main.go @@ -10,7 +10,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/mysql" + checkpoint "github.com/harlow/kinesis-consumer/store/mysql" ) func main() { @@ -33,7 +33,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithCheckpoint(ck), + consumer.WithStorage(ck), consumer.WithCounter(counter), ) if err != nil { diff --git a/examples/consumer/cp-postgres/README.md b/examples/consumer/cp-postgres/README.md index f49e5d0..889fee8 100644 --- a/examples/consumer/cp-postgres/README.md +++ b/examples/consumer/cp-postgres/README.md @@ -7,9 +7,8 @@ Read records from the Kinesis stream using postgres as checkpoint Export the required environment vars for connecting to the Kinesis stream: ```shell -export AWS_ACCESS_KEY= +export AWS_PROFILE= export AWS_REGION= -export AWS_SECRET_KEY= ``` ## Run the consumer diff --git a/examples/consumer/cp-postgres/main.go b/examples/consumer/cp-postgres/main.go index daf5720..ed83cc5 100644 --- a/examples/consumer/cp-postgres/main.go +++ b/examples/consumer/cp-postgres/main.go @@ -10,7 +10,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres" + checkpoint "github.com/harlow/kinesis-consumer/store/postgres" ) func main() { @@ -33,7 +33,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithCheckpoint(ck), + consumer.WithStorage(ck), consumer.WithCounter(counter), ) if err != nil { diff --git a/examples/consumer/cp-redis/main.go b/examples/consumer/cp-redis/main.go index 8b3450e..7200850 100644 --- a/examples/consumer/cp-redis/main.go +++ b/examples/consumer/cp-redis/main.go @@ -9,7 +9,7 @@ import ( "os/signal" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" + checkpoint "github.com/harlow/kinesis-consumer/store/redis" ) // A myLogger provides a minimalistic logger satisfying the Logger interface. @@ -43,7 +43,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithCheckpoint(ck), + consumer.WithStorage(ck), consumer.WithLogger(logger), ) if err != nil { diff --git a/go.mod b/go.mod index e97e80f..befba7b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-sql-driver/mysql v1.4.1 github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 github.com/pkg/errors v0.8.0 + github.com/stretchr/testify v1.3.0 // indirect google.golang.org/appengine v1.6.1 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/redis.v5 v5.2.9 diff --git a/go.sum b/go.sum index 7a1f283..3b797bc 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/apex/log v1.0.0 h1:5UWeZC54mWVtOGSCjtuvDPgY/o0QxmjQgvYZ27pLVGQ= github.com/apex/log v1.0.0/go.mod h1:yA770aXIDQrhVOIGurT/pVdfCpSq1GQV/auzMN5fzvY= github.com/aws/aws-sdk-go v1.15.0 h1:uxi9gcf4jxEX7r8oWYMEkYB4kziKet+1cHPmq52LjC4= github.com/aws/aws-sdk-go v1.15.0/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -14,6 +16,11 @@ github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5Wu github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/options.go b/options.go index c17ba0b..4740dd4 100644 --- a/options.go +++ b/options.go @@ -5,10 +5,10 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) -// WithCheckpoint overrides the default checkpoint -func WithCheckpoint(checkpoint Checkpoint) Option { +// WithStorage overrides the default storage +func WithStorage(store Store) Option { return func(c *Consumer) { - c.checkpoint = checkpoint + c.store = store } } diff --git a/store.go b/store.go new file mode 100644 index 0000000..4b4a9d4 --- /dev/null +++ b/store.go @@ -0,0 +1,13 @@ +package consumer + +// Store interface used to persist scan progress +type Store interface { + GetCheckpoint(streamName, shardID string) (string, error) + SetCheckpoint(streamName, shardID, sequenceNumber string) error +} + +// noopStore implements the storage interface with discard +type noopStore struct{} + +func (n noopStore) GetCheckpoint(string, string) (string, error) { return "", nil } +func (n noopStore) SetCheckpoint(string, string, string) error { return nil } diff --git a/checkpoint/ddb/ddb.go b/store/ddb/ddb.go similarity index 92% rename from checkpoint/ddb/ddb.go rename to store/ddb/ddb.go index 170ced4..0b5c9b5 100644 --- a/checkpoint/ddb/ddb.go +++ b/store/ddb/ddb.go @@ -87,7 +87,7 @@ type item struct { // Get determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) params := &dynamodb.GetItemInput{ @@ -106,7 +106,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { resp, err := c.client.GetItem(params) if err != nil { if c.retryer.ShouldRetry(err) { - return c.Get(streamName, shardID) + return c.GetCheckpoint(streamName, shardID) } return "", err } @@ -116,9 +116,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return i.SequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/checkpoint/ddb/ddb_test.go b/store/ddb/ddb_test.go similarity index 100% rename from checkpoint/ddb/ddb_test.go rename to store/ddb/ddb_test.go diff --git a/checkpoint/ddb/retryer.go b/store/ddb/retryer.go similarity index 100% rename from checkpoint/ddb/retryer.go rename to store/ddb/retryer.go diff --git a/checkpoint/ddb/retryer_test.go b/store/ddb/retryer_test.go similarity index 100% rename from checkpoint/ddb/retryer_test.go rename to store/ddb/retryer_test.go diff --git a/checkpoint/mysql/mysql.go b/store/mysql/mysql.go similarity index 90% rename from checkpoint/mysql/mysql.go rename to store/mysql/mysql.go index f4a27d2..15daf98 100644 --- a/checkpoint/mysql/mysql.go +++ b/store/mysql/mysql.go @@ -77,10 +77,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration { return c.maxInterval } -// Get determines if a checkpoint for a particular Shard exists. +// GetCheckpoint determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) var sequenceNumber string @@ -97,9 +97,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return sequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/checkpoint/mysql/mysql_databaseutils_test.go b/store/mysql/mysql_databaseutils_test.go similarity index 100% rename from checkpoint/mysql/mysql_databaseutils_test.go rename to store/mysql/mysql_databaseutils_test.go diff --git a/checkpoint/mysql/mysql_test.go b/store/mysql/mysql_test.go similarity index 94% rename from checkpoint/mysql/mysql_test.go rename to store/mysql/mysql_test.go index 137726c..afc3cc0 100644 --- a/checkpoint/mysql/mysql_test.go +++ b/store/mysql/mysql_test.go @@ -72,7 +72,7 @@ func TestNew_WithMaxIntervalOption(t *testing.T) { ck.Shutdown() } -func TestCheckpoint_Get(t *testing.T) { +func TestCheckpoint_GetCheckpoint(t *testing.T) { appName := "streamConsumer" tableName := "checkpoint" connString := "user:password@/dbname" @@ -98,7 +98,7 @@ func TestCheckpoint_Get(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnRows(expectedRows) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != expectedSequenceNumber { t.Errorf("expected sequence number equals %v, but got %v", expectedSequenceNumber, gotSequenceNumber) @@ -134,7 +134,7 @@ func TestCheckpoint_Get_NoRows(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(sql.ErrNoRows) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != "" { t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber) @@ -170,7 +170,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(errors.New("an error")) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != "" { t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber) @@ -184,7 +184,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) { ck.Shutdown() } -func TestCheckpoint_Set(t *testing.T) { +func TestCheckpoint_SetCheckpoint(t *testing.T) { appName := "streamConsumer" tableName := "checkpoint" connString := "user:password@/dbname" @@ -197,7 +197,7 @@ func TestCheckpoint_Set(t *testing.T) { t.Fatalf("error occurred during the checkpoint creation. cause: %v", err) } - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Errorf("expected error equals nil, but got %v", err) @@ -218,7 +218,7 @@ func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) { t.Fatalf("error occurred during the checkpoint creation. cause: %v", err) } - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err == nil { t.Errorf("expected error equals not nil, but got %v", err) @@ -249,7 +249,7 @@ func TestCheckpoint_Shutdown(t *testing.T) { result := sqlmock.NewResult(0, 1) mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnResult(result) - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err) @@ -287,7 +287,7 @@ func TestCheckpoint_Shutdown_SaveError(t *testing.T) { expectedSQLRegexString := fmt.Sprintf(`REPLACE INTO %s \(namespace, shard_id, sequence_number\) VALUES \(\?, \?, \?\)`, tableName) mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnError(errors.New("an error")) - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err) diff --git a/checkpoint/postgres/postgres.go b/store/postgres/postgres.go similarity index 90% rename from checkpoint/postgres/postgres.go rename to store/postgres/postgres.go index b5a5bda..55bbc20 100644 --- a/checkpoint/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "time" + // this is the postgres package so it makes sense to be here _ "github.com/lib/pq" ) @@ -77,10 +78,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration { return c.maxInterval } -// Get determines if a checkpoint for a particular Shard exists. +// GetCheckpoint determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) var sequenceNumber string @@ -97,9 +98,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return sequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() @@ -149,8 +150,8 @@ func (c *Checkpoint) save() error { upsertCheckpoint := fmt.Sprintf(`INSERT INTO %s (namespace, shard_id, sequence_number) VALUES($1, $2, $3) ON CONFLICT (namespace, shard_id) - DO - UPDATE + DO + UPDATE SET sequence_number= $3;`, c.tableName) for key, sequenceNumber := range c.checkpoints { diff --git a/checkpoint/postgres/postgres_databaseutils_test.go b/store/postgres/postgres_databaseutils_test.go similarity index 100% rename from checkpoint/postgres/postgres_databaseutils_test.go rename to store/postgres/postgres_databaseutils_test.go diff --git a/checkpoint/postgres/postgres_test.go b/store/postgres/postgres_test.go similarity index 94% rename from checkpoint/postgres/postgres_test.go rename to store/postgres/postgres_test.go index c6b3bab..640da72 100644 --- a/checkpoint/postgres/postgres_test.go +++ b/store/postgres/postgres_test.go @@ -72,7 +72,7 @@ func TestNew_WithMaxIntervalOption(t *testing.T) { ck.Shutdown() } -func TestCheckpoint_Get(t *testing.T) { +func TestCheckpoint_GetCheckpoint(t *testing.T) { appName := "streamConsumer" tableName := "checkpoint" connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;" @@ -98,7 +98,7 @@ func TestCheckpoint_Get(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnRows(expectedRows) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != expectedSequenceNumber { t.Errorf("expected sequence number equals %v, but got %v", expectedSequenceNumber, gotSequenceNumber) @@ -134,7 +134,7 @@ func TestCheckpoint_Get_NoRows(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(sql.ErrNoRows) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != "" { t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber) @@ -170,7 +170,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) { tableName) mock.ExpectQuery(expectedSQLRegexString).WithArgs(namespace, shardID).WillReturnError(errors.New("an error")) - gotSequenceNumber, err := ck.Get(streamName, shardID) + gotSequenceNumber, err := ck.GetCheckpoint(streamName, shardID) if gotSequenceNumber != "" { t.Errorf("expected sequence number equals empty, but got %v", gotSequenceNumber) @@ -184,7 +184,7 @@ func TestCheckpoint_Get_QueryError(t *testing.T) { ck.Shutdown() } -func TestCheckpoint_Set(t *testing.T) { +func TestCheckpoint_SetCheckpoint(t *testing.T) { appName := "streamConsumer" tableName := "checkpoint" connString := "UserID=root;Password=myPassword;Host=localhost;Port=5432;Database=myDataBase;" @@ -197,7 +197,7 @@ func TestCheckpoint_Set(t *testing.T) { t.Fatalf("error occurred during the checkpoint creation. cause: %v", err) } - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Errorf("expected error equals nil, but got %v", err) @@ -218,7 +218,7 @@ func TestCheckpoint_Set_SequenceNumberEmpty(t *testing.T) { t.Fatalf("error occurred during the checkpoint creation. cause: %v", err) } - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err == nil { t.Errorf("expected error equals not nil, but got %v", err) @@ -249,7 +249,7 @@ func TestCheckpoint_Shutdown(t *testing.T) { result := sqlmock.NewResult(0, 1) mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnResult(result) - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err) @@ -287,7 +287,7 @@ func TestCheckpoint_Shutdown_SaveError(t *testing.T) { expectedSQLRegexString := fmt.Sprintf(`INSERT INTO %s \(namespace, shard_id, sequence_number\) VALUES\(\$1, \$2, \$3\) ON CONFLICT \(namespace, shard_id\) DO UPDATE SET sequence_number= \$3;`, tableName) mock.ExpectExec(expectedSQLRegexString).WithArgs(namespace, shardID, expectedSequenceNumber).WillReturnError(errors.New("an error")) - err = ck.Set(streamName, shardID, expectedSequenceNumber) + err = ck.SetCheckpoint(streamName, shardID, expectedSequenceNumber) if err != nil { t.Fatalf("unable to set checkpoint for data initialization. cause: %v", err) diff --git a/checkpoint/redis/redis.go b/store/redis/redis.go similarity index 77% rename from checkpoint/redis/redis.go rename to store/redis/redis.go index e3f7e51..d172f13 100644 --- a/checkpoint/redis/redis.go +++ b/store/redis/redis.go @@ -36,15 +36,15 @@ type Checkpoint struct { client *redis.Client } -// Get fetches the checkpoint for a particular Shard. -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +// GetCheckpoint fetches the checkpoint for a particular Shard. +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { val, _ := c.client.Get(c.key(streamName, shardID)).Result() return val, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { if sequenceNumber == "" { return fmt.Errorf("sequence number should not be empty") } diff --git a/checkpoint/redis/redis_test.go b/store/redis/redis_test.go similarity index 83% rename from checkpoint/redis/redis_test.go rename to store/redis/redis_test.go index 5c0d02d..0d23a70 100644 --- a/checkpoint/redis/redis_test.go +++ b/store/redis/redis_test.go @@ -12,10 +12,10 @@ func Test_CheckpointLifecycle(t *testing.T) { } // set - c.Set("streamName", "shardID", "testSeqNum") + c.SetCheckpoint("streamName", "shardID", "testSeqNum") // get - val, err := c.Get("streamName", "shardID") + val, err := c.GetCheckpoint("streamName", "shardID") if err != nil { t.Fatalf("get checkpoint error: %v", err) } @@ -30,7 +30,7 @@ func Test_SetEmptySeqNum(t *testing.T) { t.Fatalf("new checkpoint error: %v", err) } - err = c.Set("streamName", "shardID", "") + err = c.SetCheckpoint("streamName", "shardID", "") if err == nil { t.Fatalf("should not allow empty sequence number") }