Introduce Storage interface
This commit is contained in:
parent
d05d6c2d5e
commit
03afda196f
9 changed files with 59 additions and 45 deletions
|
|
@ -1,13 +0,0 @@
|
||||||
package consumer
|
|
||||||
|
|
||||||
// Checkpoint interface used track consumer progress in the stream
|
|
||||||
type Checkpoint interface {
|
|
||||||
Get(streamName, shardID string) (string, error)
|
|
||||||
Set(streamName, shardID, sequenceNumber string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// noopCheckpoint implements the checkpoint interface with discard
|
|
||||||
type noopCheckpoint struct{}
|
|
||||||
|
|
||||||
func (n noopCheckpoint) Set(string, string, string) error { return nil }
|
|
||||||
func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil }
|
|
||||||
|
|
@ -87,7 +87,7 @@ type item struct {
|
||||||
// Get determines if a checkpoint for a particular Shard exists.
|
// Get determines if a checkpoint for a particular Shard exists.
|
||||||
// Typically used to determine whether we should start processing the shard with
|
// Typically used to determine whether we should start processing the shard with
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) {
|
||||||
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
||||||
|
|
||||||
params := &dynamodb.GetItemInput{
|
params := &dynamodb.GetItemInput{
|
||||||
|
|
@ -106,7 +106,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
resp, err := c.client.GetItem(params)
|
resp, err := c.client.GetItem(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.retryer.ShouldRetry(err) {
|
if c.retryer.ShouldRetry(err) {
|
||||||
return c.Get(streamName, shardID)
|
return c.GetCheckpoint(streamName, shardID)
|
||||||
}
|
}
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
@ -116,9 +116,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
return i.SequenceNumber, nil
|
return i.SequenceNumber, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -77,10 +77,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration {
|
||||||
return c.maxInterval
|
return c.maxInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get determines if a checkpoint for a particular Shard exists.
|
// GetCheckpoint determines if a checkpoint for a particular Shard exists.
|
||||||
// Typically used to determine whether we should start processing the shard with
|
// Typically used to determine whether we should start processing the shard with
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) {
|
||||||
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
||||||
|
|
||||||
var sequenceNumber string
|
var sequenceNumber string
|
||||||
|
|
@ -97,9 +97,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
return sequenceNumber, nil
|
return sequenceNumber, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
// this is the postgres package so it makes sense to be here
|
// this is the postgres package so it makes sense to be here
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
@ -77,10 +78,10 @@ func (c *Checkpoint) GetMaxInterval() time.Duration {
|
||||||
return c.maxInterval
|
return c.maxInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get determines if a checkpoint for a particular Shard exists.
|
// GetCheckpoint determines if a checkpoint for a particular Shard exists.
|
||||||
// Typically used to determine whether we should start processing the shard with
|
// Typically used to determine whether we should start processing the shard with
|
||||||
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) {
|
||||||
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)
|
||||||
|
|
||||||
var sequenceNumber string
|
var sequenceNumber string
|
||||||
|
|
@ -97,9 +98,9 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
||||||
return sequenceNumber, nil
|
return sequenceNumber, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
|
@ -149,8 +150,8 @@ func (c *Checkpoint) save() error {
|
||||||
upsertCheckpoint := fmt.Sprintf(`INSERT INTO %s (namespace, shard_id, sequence_number)
|
upsertCheckpoint := fmt.Sprintf(`INSERT INTO %s (namespace, shard_id, sequence_number)
|
||||||
VALUES($1, $2, $3)
|
VALUES($1, $2, $3)
|
||||||
ON CONFLICT (namespace, shard_id)
|
ON CONFLICT (namespace, shard_id)
|
||||||
DO
|
DO
|
||||||
UPDATE
|
UPDATE
|
||||||
SET sequence_number= $3;`, c.tableName)
|
SET sequence_number= $3;`, c.tableName)
|
||||||
|
|
||||||
for key, sequenceNumber := range c.checkpoints {
|
for key, sequenceNumber := range c.checkpoints {
|
||||||
|
|
|
||||||
|
|
@ -36,15 +36,15 @@ type Checkpoint struct {
|
||||||
client *redis.Client
|
client *redis.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get fetches the checkpoint for a particular Shard.
|
// GetCheckpoint fetches the checkpoint for a particular Shard.
|
||||||
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
|
func (c *Checkpoint) GetCheckpoint(streamName, shardID string) (string, error) {
|
||||||
val, _ := c.client.Get(c.key(streamName, shardID)).Result()
|
val, _ := c.client.Get(c.key(streamName, shardID)).Result()
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
|
||||||
// Upon failover, record processing is resumed from this point.
|
// Upon failover, record processing is resumed from this point.
|
||||||
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
|
func (c *Checkpoint) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
|
||||||
if sequenceNumber == "" {
|
if sequenceNumber == "" {
|
||||||
return fmt.Errorf("sequence number should not be empty")
|
return fmt.Errorf("sequence number should not be empty")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
17
consumer.go
17
consumer.go
|
|
@ -23,12 +23,12 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
|
||||||
return nil, fmt.Errorf("must provide stream name")
|
return nil, fmt.Errorf("must provide stream name")
|
||||||
}
|
}
|
||||||
|
|
||||||
// new consumer with no-op checkpoint, counter, and logger
|
// new consumer with noop storage, counter, and logger
|
||||||
c := &Consumer{
|
c := &Consumer{
|
||||||
streamName: streamName,
|
streamName: streamName,
|
||||||
initialShardIteratorType: kinesis.ShardIteratorTypeLatest,
|
initialShardIteratorType: kinesis.ShardIteratorTypeLatest,
|
||||||
|
storage: &noopStorage{},
|
||||||
counter: &noopCounter{},
|
counter: &noopCounter{},
|
||||||
checkpoint: &noopCheckpoint{},
|
|
||||||
logger: &noopLogger{
|
logger: &noopLogger{
|
||||||
logger: log.New(ioutil.Discard, "", log.LstdFlags),
|
logger: log.New(ioutil.Discard, "", log.LstdFlags),
|
||||||
},
|
},
|
||||||
|
|
@ -62,8 +62,12 @@ type Consumer struct {
|
||||||
initialShardIteratorType string
|
initialShardIteratorType string
|
||||||
client kinesisiface.KinesisAPI
|
client kinesisiface.KinesisAPI
|
||||||
logger Logger
|
logger Logger
|
||||||
|
<<<<<<< HEAD
|
||||||
group Group
|
group Group
|
||||||
checkpoint Checkpoint
|
checkpoint Checkpoint
|
||||||
|
=======
|
||||||
|
storage Storage
|
||||||
|
>>>>>>> 0162c90... Introduce Storage interface
|
||||||
counter Counter
|
counter Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -120,7 +124,11 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
|
||||||
// for each record and checkpoints the progress of scan.
|
// for each record and checkpoints the progress of scan.
|
||||||
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error {
|
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error {
|
||||||
// get last seq number from checkpoint
|
// get last seq number from checkpoint
|
||||||
|
<<<<<<< HEAD
|
||||||
lastSeqNum, err := c.group.GetCheckpoint(c.streamName, shardID)
|
lastSeqNum, err := c.group.GetCheckpoint(c.streamName, shardID)
|
||||||
|
=======
|
||||||
|
lastSeqNum, err := c.storage.GetCheckpoint(c.streamName, shardID)
|
||||||
|
>>>>>>> 0162c90... Introduce Storage interface
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get checkpoint error: %v", err)
|
return fmt.Errorf("get checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -165,8 +173,13 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
if err != ErrSkipCheckpoint {
|
if err != ErrSkipCheckpoint {
|
||||||
if err := c.group.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil {
|
if err := c.group.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil {
|
||||||
|
=======
|
||||||
|
if err != SkipCheckpoint {
|
||||||
|
if err := c.storage.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil {
|
||||||
|
>>>>>>> 0162c90... Introduce Storage interface
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
consumer "github.com/harlow/kinesis-consumer"
|
consumer "github.com/harlow/kinesis-consumer"
|
||||||
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
storage "github.com/harlow/kinesis-consumer/checkpoint/ddb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// kick off a server for exposing scan metrics
|
// kick off a server for exposing scan metrics
|
||||||
|
|
@ -69,8 +69,8 @@ func main() {
|
||||||
myKsis := kinesis.New(sess)
|
myKsis := kinesis.New(sess)
|
||||||
myDdbClient := dynamodb.New(sess)
|
myDdbClient := dynamodb.New(sess)
|
||||||
|
|
||||||
// ddb checkpoint
|
// ddb persitance
|
||||||
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDdbClient), checkpoint.WithRetryer(&MyRetryer{}))
|
ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Log("checkpoint error: %v", err)
|
log.Log("checkpoint error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -81,7 +81,7 @@ func main() {
|
||||||
// consumer
|
// consumer
|
||||||
c, err := consumer.New(
|
c, err := consumer.New(
|
||||||
*stream,
|
*stream,
|
||||||
consumer.WithCheckpoint(ck),
|
consumer.WithStorage(ddb),
|
||||||
consumer.WithLogger(log),
|
consumer.WithLogger(log),
|
||||||
consumer.WithCounter(counter),
|
consumer.WithCounter(counter),
|
||||||
consumer.WithClient(myKsis),
|
consumer.WithClient(myKsis),
|
||||||
|
|
@ -111,17 +111,17 @@ func main() {
|
||||||
log.Log("scan error: %v", err)
|
log.Log("scan error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ck.Shutdown(); err != nil {
|
if err := ddb.Shutdown(); err != nil {
|
||||||
log.Log("checkpoint shutdown error: %v", err)
|
log.Log("storage shutdown error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MyRetryer used for checkpointing
|
// MyRetryer used for storage
|
||||||
type MyRetryer struct {
|
type MyRetryer struct {
|
||||||
checkpoint.Retryer
|
storage.Retryer
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldRetry implements custom logic for when a checkpont should retry
|
// ShouldRetry implements custom logic for when errors should retry
|
||||||
func (r *MyRetryer) ShouldRetry(err error) bool {
|
func (r *MyRetryer) ShouldRetry(err error) bool {
|
||||||
if awsErr, ok := err.(awserr.Error); ok {
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
switch awsErr.Code() {
|
switch awsErr.Code() {
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,10 @@ import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
// Option is used to override defaults when creating a new Consumer
|
// Option is used to override defaults when creating a new Consumer
|
||||||
type Option func(*Consumer)
|
type Option func(*Consumer)
|
||||||
|
|
||||||
// WithCheckpoint overrides the default checkpoint
|
// WithStorage overrides the default storage
|
||||||
func WithCheckpoint(checkpoint Checkpoint) Option {
|
func WithStorage(storage Storage) Option {
|
||||||
return func(c *Consumer) {
|
return func(c *Consumer) {
|
||||||
c.checkpoint = checkpoint
|
c.storage = storage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
13
storage.go
Normal file
13
storage.go
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
package consumer
|
||||||
|
|
||||||
|
// Storage interface used to persist scan progress
|
||||||
|
type Storage interface {
|
||||||
|
GetCheckpoint(streamName, shardID string) (string, error)
|
||||||
|
SetCheckpoint(streamName, shardID, sequenceNumber string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// noopStorage implements the storage interface with discard
|
||||||
|
type noopStorage struct{}
|
||||||
|
|
||||||
|
func (n noopStorage) GetCheckpoint(string, string) (string, error) { return "", nil }
|
||||||
|
func (n noopStorage) SetCheckpoint(string, string, string) error { return nil }
|
||||||
Loading…
Reference in a new issue