From 03afda196fb950a08c56b0015a8f6022fd46d95c Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Fri, 24 May 2019 19:43:36 -0700 Subject: [PATCH] Introduce Storage interface --- checkpoint.go | 13 ------------- checkpoint/ddb/ddb.go | 8 ++++---- checkpoint/mysql/mysql.go | 8 ++++---- checkpoint/postgres/postgres.go | 13 +++++++------ checkpoint/redis/redis.go | 8 ++++---- consumer.go | 17 +++++++++++++++-- examples/consumer/cp-dynamo/main.go | 18 +++++++++--------- options.go | 6 +++--- storage.go | 13 +++++++++++++ 9 files changed, 59 insertions(+), 45 deletions(-) delete mode 100644 checkpoint.go create mode 100644 storage.go diff --git a/checkpoint.go b/checkpoint.go deleted file mode 100644 index 383d4c1..0000000 --- a/checkpoint.go +++ /dev/null @@ -1,13 +0,0 @@ -package consumer - -// Checkpoint interface used track consumer progress in the stream -type Checkpoint interface { - Get(streamName, shardID string) (string, error) - Set(streamName, shardID, sequenceNumber string) error -} - -// noopCheckpoint implements the checkpoint interface with discard -type noopCheckpoint struct{} - -func (n noopCheckpoint) Set(string, string, string) error { return nil } -func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil } diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index 170ced4..0b5c9b5 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -87,7 +87,7 @@ type item struct { // Get determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) params := &dynamodb.GetItemInput{ @@ -106,7 +106,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { resp, err := c.client.GetItem(params) if err != nil { if c.retryer.ShouldRetry(err) { - return c.Get(streamName, shardID) + return c.GetCheckpoint(streamName, shardID) } return "", err } @@ -116,9 +116,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return i.SequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/checkpoint/mysql/mysql.go b/checkpoint/mysql/mysql.go index f4a27d2..15daf98 100644 --- a/checkpoint/mysql/mysql.go +++ b/checkpoint/mysql/mysql.go @@ -77,10 +77,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration { return c.maxInterval } -// Get determines if a checkpoint for a particular Shard exists. +// GetCheckpoint determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) var sequenceNumber string @@ -97,9 +97,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return sequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/checkpoint/postgres/postgres.go b/checkpoint/postgres/postgres.go index b5a5bda..55bbc20 100644 --- a/checkpoint/postgres/postgres.go +++ b/checkpoint/postgres/postgres.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "time" + // this is the postgres package so it makes sense to be here _ "github.com/lib/pq" ) @@ -77,10 +78,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration { return c.maxInterval } -// Get determines if a checkpoint for a particular Shard exists. +// GetCheckpoint determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) var sequenceNumber string @@ -97,9 +98,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { return sequenceNumber, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { c.mu.Lock() defer c.mu.Unlock() @@ -149,8 +150,8 @@ func (c *Checkpoint) save() error { upsertCheckpoint := fmt.Sprintf(`INSERT INTO %s (namespace, shard_id, sequence_number) VALUES($1, $2, $3) ON CONFLICT (namespace, shard_id) - DO - UPDATE + DO + UPDATE SET sequence_number= $3;`, c.tableName) for key, sequenceNumber := range c.checkpoints { diff --git a/checkpoint/redis/redis.go b/checkpoint/redis/redis.go index e3f7e51..d172f13 100644 --- a/checkpoint/redis/redis.go +++ b/checkpoint/redis/redis.go @@ -36,15 +36,15 @@ type Checkpoint struct { client *redis.Client } -// Get fetches the checkpoint for a particular Shard. -func (c *Checkpoint) Get(streamName, shardID string) (string, error) { +// GetCheckpoint fetches the checkpoint for a particular Shard. +func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) { val, _ := c.client.Get(c.key(streamName, shardID)).Result() return val, nil } -// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { +func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error { if sequenceNumber == "" { return fmt.Errorf("sequence number should not be empty") } diff --git a/consumer.go b/consumer.go index d651020..9ce6aa6 100644 --- a/consumer.go +++ b/consumer.go @@ -23,12 +23,12 @@ func New(streamName string, opts ...Option) (*Consumer, error) { return nil, fmt.Errorf("must provide stream name") } - // new consumer with no-op checkpoint, counter, and logger + // new consumer with noop storage, counter, and logger c := &Consumer{ streamName: streamName, initialShardIteratorType: kinesis.ShardIteratorTypeLatest, + storage: &noopStorage{}, counter: &noopCounter{}, - checkpoint: &noopCheckpoint{}, logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, @@ -62,8 +62,12 @@ type Consumer struct { initialShardIteratorType string client kinesisiface.KinesisAPI logger Logger +<<<<<<< HEAD group Group checkpoint Checkpoint +======= + storage Storage +>>>>>>> 0162c90... Introduce Storage interface counter Counter } @@ -120,7 +124,11 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { // for each record and checkpoints the progress of scan. func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error { // get last seq number from checkpoint +<<<<<<< HEAD lastSeqNum, err := c.group.GetCheckpoint(c.streamName, shardID) +======= + lastSeqNum, err := c.storage.GetCheckpoint(c.streamName, shardID) +>>>>>>> 0162c90... Introduce Storage interface if err != nil { return fmt.Errorf("get checkpoint error: %v", err) } @@ -165,8 +173,13 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return err } +<<<<<<< HEAD if err != ErrSkipCheckpoint { if err := c.group.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil { +======= + if err != SkipCheckpoint { + if err := c.storage.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil { +>>>>>>> 0162c90... Introduce Storage interface return err } } diff --git a/examples/consumer/cp-dynamo/main.go b/examples/consumer/cp-dynamo/main.go index af885b3..afd73ce 100644 --- a/examples/consumer/cp-dynamo/main.go +++ b/examples/consumer/cp-dynamo/main.go @@ -19,7 +19,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" + storage "github.com/harlow/kinesis-consumer/checkpoint/ddb" ) // kick off a server for exposing scan metrics @@ -69,8 +69,8 @@ func main() { myKsis := kinesis.New(sess) myDdbClient := dynamodb.New(sess) - // ddb checkpoint - ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDdbClient), checkpoint.WithRetryer(&MyRetryer{})) + // ddb persitance + ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{})) if err != nil { log.Log("checkpoint error: %v", err) } @@ -81,7 +81,7 @@ func main() { // consumer c, err := consumer.New( *stream, - consumer.WithCheckpoint(ck), + consumer.WithStorage(ddb), consumer.WithLogger(log), consumer.WithCounter(counter), consumer.WithClient(myKsis), @@ -111,17 +111,17 @@ func main() { log.Log("scan error: %v", err) } - if err := ck.Shutdown(); err != nil { - log.Log("checkpoint shutdown error: %v", err) + if err := ddb.Shutdown(); err != nil { + log.Log("storage shutdown error: %v", err) } } -// MyRetryer used for checkpointing +// MyRetryer used for storage type MyRetryer struct { - checkpoint.Retryer + storage.Retryer } -// ShouldRetry implements custom logic for when a checkpont should retry +// ShouldRetry implements custom logic for when errors should retry func (r *MyRetryer) ShouldRetry(err error) bool { if awsErr, ok := err.(awserr.Error); ok { switch awsErr.Code() { diff --git a/options.go b/options.go index c17ba0b..f8f4440 100644 --- a/options.go +++ b/options.go @@ -5,10 +5,10 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) -// WithCheckpoint overrides the default checkpoint -func WithCheckpoint(checkpoint Checkpoint) Option { +// WithStorage overrides the default storage +func WithStorage(storage Storage) Option { return func(c *Consumer) { - c.checkpoint = checkpoint + c.storage = storage } } diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..8a926ad --- /dev/null +++ b/storage.go @@ -0,0 +1,13 @@ +package consumer + +// Storage interface used to persist scan progress +type Storage interface { + GetCheckpoint(streamName, shardID string) (string, error) + SetCheckpoint(streamName, shardID, sequenceNumber string) error +} + +// noopStorage implements the storage interface with discard +type noopStorage struct{} + +func (n noopStorage) GetCheckpoint(string, string) (string, error) { return "", nil } +func (n noopStorage) SetCheckpoint(string, string, string) error { return nil }