From 64cdf69249ad03d90c0e0614db26e3795877bb0c Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 30 Dec 2017 20:21:10 -0800 Subject: [PATCH] Add interval flush for DDB checkpoint (#40) Add interval flush for DDB checkpoint * Allow checkpointing on a specified interval * Add shutdown method to checkpoint to force flush Minor changes: * Swap order of input params for checkpoint (app, table) Addresses: https://github.com/harlow/kinesis-consumer/issues/39 --- README.md | 2 +- checkpoint/ddb/ddb.go | 126 ++++++++++++++++++++++++++++++-------- client.go | 2 +- examples/consumer/main.go | 43 +++++++------ 4 files changed, 122 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index a28bd50..ccf4ae9 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ The DynamoDB checkpoint requires Table Name, App Name, and Stream Name: import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" // ddb checkpoint -ck, err := checkpoint.New(tableName, appName) +ck, err := checkpoint.New(appName, tableName) if err != nil { log.Fatalf("new checkpoint error: %v", err) } diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index 1c0daa2..74a5300 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -3,6 +3,8 @@ package redis import ( "fmt" "log" + "sync" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -11,10 +13,21 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" ) +// Option is used to override defaults when creating a new Checkpoint +type Option func(*Checkpoint) + +// WithMaxInterval sets the flush interval +func WithMaxInterval(maxInterval time.Duration) Option { + return func(c *Checkpoint) { + c.maxInterval = maxInterval + } +} + // New returns a checkpoint that uses DynamoDB for underlying storage -func New(tableName, appName string) (*Checkpoint, error) { +func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { client := dynamodb.New(session.New(aws.NewConfig())) + // ping table to verify it exists _, err := client.DescribeTable(&dynamodb.DescribeTableInput{ TableName: aws.String(tableName), }) @@ -22,18 +35,39 @@ func New(tableName, appName string) (*Checkpoint, error) { return nil, err } - return &Checkpoint{ - tableName: tableName, - appName: appName, - client: client, - }, nil + ck := &Checkpoint{ + tableName: tableName, + appName: appName, + client: client, + maxInterval: time.Duration(1 * time.Minute), + done: make(chan struct{}), + mu: &sync.Mutex{}, + checkpoints: map[key]string{}, + } + + for _, opt := range opts { + opt(ck) + } + + go ck.loop() + + return ck, nil } // Checkpoint stores and retreives the last evaluated key from a DDB scan type Checkpoint struct { - tableName string - appName string - client *dynamodb.DynamoDB + tableName string + appName string + client *dynamodb.DynamoDB + maxInterval time.Duration + mu *sync.Mutex // protects the checkpoints + checkpoints map[key]string + done chan struct{} +} + +type key struct { + streamName string + shardID string } type item struct { @@ -77,32 +111,70 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { // Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { + c.mu.Lock() + defer c.mu.Unlock() + if sequenceNumber == "" { return fmt.Errorf("sequence number should not be empty") } - namespace := fmt.Sprintf("%s-%s", c.appName, streamName) - - item, err := dynamodbattribute.MarshalMap(item{ - Namespace: namespace, - ShardID: shardID, - SequenceNumber: sequenceNumber, - }) - if err != nil { - log.Printf("marshal map error: %v", err) - return nil + key := key{ + streamName: streamName, + shardID: shardID, } + c.checkpoints[key] = sequenceNumber - _, err = c.client.PutItem(&dynamodb.PutItemInput{ - TableName: aws.String(c.tableName), - Item: item, - }) - if err != nil { - if !retriableError(err) { - return err + return nil +} + +// Shutdown the checkpoint. Save any in-flight data. +func (c *Checkpoint) Shutdown() error { + c.done <- struct{}{} + return c.save() +} + +func (c *Checkpoint) loop() { + tick := time.NewTicker(c.maxInterval) + defer tick.Stop() + defer close(c.done) + + for { + select { + case <-tick.C: + c.save() + case <-c.done: + return } - return c.Set(streamName, shardID, sequenceNumber) } +} + +func (c *Checkpoint) save() error { + c.mu.Lock() + defer c.mu.Unlock() + + for key, sequenceNumber := range c.checkpoints { + item, err := dynamodbattribute.MarshalMap(item{ + Namespace: fmt.Sprintf("%s-%s", c.appName, key.streamName), + ShardID: key.shardID, + SequenceNumber: sequenceNumber, + }) + if err != nil { + log.Printf("marshal map error: %v", err) + return nil + } + + _, err = c.client.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(c.tableName), + Item: item, + }) + if err != nil { + if !retriableError(err) { + return err + } + return c.save() + } + } + return nil } diff --git a/client.go b/client.go index 60aca68..8938175 100644 --- a/client.go +++ b/client.go @@ -15,7 +15,7 @@ func NewKinesisClient() *KinesisClient { return &KinesisClient{svc} } -// Client acts as wrapper around Kinesis client +// KinesisClient acts as wrapper around Kinesis client type KinesisClient struct { svc *kinesis.Kinesis } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index b690730..3dc313c 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -10,10 +10,9 @@ import ( "net/http" "os" "os/signal" - "syscall" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" + checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" ) // kick off a server for exposing scan metrics @@ -32,28 +31,12 @@ func main() { var ( app = flag.String("app", "", "App name") stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Checkpoint table name") ) flag.Parse() - // trap SIGINT, wait to trigger shutdown - signals := make(chan os.Signal, 1) - signal.Notify(signals, - os.Interrupt, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT, - ) - - // use cancel func to signal shutdown - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-signals - cancel() - }() - - // redis checkpoint - ck, err := checkpoint.New(*app) + // ddb checkpoint + ck, err := checkpoint.New(*app, *table) if err != nil { log.Fatalf("checkpoint error: %v", err) } @@ -74,7 +57,19 @@ func main() { log.Fatalf("consumer error: %v", err) } - // start scan + // use cancel func to signal shutdown + ctx, cancel := context.WithCancel(context.Background()) + + // trap SIGINT, wait to trigger shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + go func() { + <-signals + cancel() + }() + + // scan stream err = c.Scan(ctx, func(r *consumer.Record) bool { fmt.Println(string(r.Data)) return true // continue scanning @@ -82,4 +77,8 @@ func main() { if err != nil { log.Fatalf("scan error: %v", err) } + + if err := ck.Shutdown(); err != nil { + log.Fatalf("checkpoint shutdown error: %v", err) + } }