Add interval flush for DDB checkpoint (#40)

Add interval flush for DDB checkpoint

* Allow checkpointing on a specified interval
* Add shutdown method to checkpoint to force flush

Minor changes:

* Swap order of input params for checkpoint (app, table)

Addresses: https://github.com/harlow/kinesis-consumer/issues/39
This commit is contained in:
Harlow Ward 2017-12-30 20:21:10 -08:00 committed by GitHub
parent 955f74d553
commit 64cdf69249
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 51 deletions

View file

@ -87,7 +87,7 @@ The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:
import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
// ddb checkpoint // ddb checkpoint
ck, err := checkpoint.New(tableName, appName) ck, err := checkpoint.New(appName, tableName)
if err != nil { if err != nil {
log.Fatalf("new checkpoint error: %v", err) log.Fatalf("new checkpoint error: %v", err)
} }

View file

@ -3,6 +3,8 @@ package redis
import ( import (
"fmt" "fmt"
"log" "log"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
@ -11,10 +13,21 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
) )
// Option is used to override defaults when creating a new Checkpoint
type Option func(*Checkpoint)
// WithMaxInterval sets the flush interval
func WithMaxInterval(maxInterval time.Duration) Option {
return func(c *Checkpoint) {
c.maxInterval = maxInterval
}
}
// New returns a checkpoint that uses DynamoDB for underlying storage // New returns a checkpoint that uses DynamoDB for underlying storage
func New(tableName, appName string) (*Checkpoint, error) { func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
client := dynamodb.New(session.New(aws.NewConfig())) client := dynamodb.New(session.New(aws.NewConfig()))
// ping table to verify it exists
_, err := client.DescribeTable(&dynamodb.DescribeTableInput{ _, err := client.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName), TableName: aws.String(tableName),
}) })
@ -22,18 +35,39 @@ func New(tableName, appName string) (*Checkpoint, error) {
return nil, err return nil, err
} }
return &Checkpoint{ ck := &Checkpoint{
tableName: tableName, tableName: tableName,
appName: appName, appName: appName,
client: client, client: client,
}, nil maxInterval: time.Duration(1 * time.Minute),
done: make(chan struct{}),
mu: &sync.Mutex{},
checkpoints: map[key]string{},
}
for _, opt := range opts {
opt(ck)
}
go ck.loop()
return ck, nil
} }
// Checkpoint stores and retreives the last evaluated key from a DDB scan // Checkpoint stores and retreives the last evaluated key from a DDB scan
type Checkpoint struct { type Checkpoint struct {
tableName string tableName string
appName string appName string
client *dynamodb.DynamoDB client *dynamodb.DynamoDB
maxInterval time.Duration
mu *sync.Mutex // protects the checkpoints
checkpoints map[key]string
done chan struct{}
}
type key struct {
streamName string
shardID string
} }
type item struct { type item struct {
@ -77,32 +111,70 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point. // Upon failover, record processing is resumed from this point.
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error { func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
c.mu.Lock()
defer c.mu.Unlock()
if sequenceNumber == "" { if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty") return fmt.Errorf("sequence number should not be empty")
} }
namespace := fmt.Sprintf("%s-%s", c.appName, streamName) key := key{
streamName: streamName,
item, err := dynamodbattribute.MarshalMap(item{ shardID: shardID,
Namespace: namespace,
ShardID: shardID,
SequenceNumber: sequenceNumber,
})
if err != nil {
log.Printf("marshal map error: %v", err)
return nil
} }
c.checkpoints[key] = sequenceNumber
_, err = c.client.PutItem(&dynamodb.PutItemInput{ return nil
TableName: aws.String(c.tableName), }
Item: item,
}) // Shutdown the checkpoint. Save any in-flight data.
if err != nil { func (c *Checkpoint) Shutdown() error {
if !retriableError(err) { c.done <- struct{}{}
return err return c.save()
}
func (c *Checkpoint) loop() {
tick := time.NewTicker(c.maxInterval)
defer tick.Stop()
defer close(c.done)
for {
select {
case <-tick.C:
c.save()
case <-c.done:
return
} }
return c.Set(streamName, shardID, sequenceNumber)
} }
}
func (c *Checkpoint) save() error {
c.mu.Lock()
defer c.mu.Unlock()
for key, sequenceNumber := range c.checkpoints {
item, err := dynamodbattribute.MarshalMap(item{
Namespace: fmt.Sprintf("%s-%s", c.appName, key.streamName),
ShardID: key.shardID,
SequenceNumber: sequenceNumber,
})
if err != nil {
log.Printf("marshal map error: %v", err)
return nil
}
_, err = c.client.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(c.tableName),
Item: item,
})
if err != nil {
if !retriableError(err) {
return err
}
return c.save()
}
}
return nil return nil
} }

View file

@ -15,7 +15,7 @@ func NewKinesisClient() *KinesisClient {
return &KinesisClient{svc} return &KinesisClient{svc}
} }
// Client acts as wrapper around Kinesis client // KinesisClient acts as wrapper around Kinesis client
type KinesisClient struct { type KinesisClient struct {
svc *kinesis.Kinesis svc *kinesis.Kinesis
} }

View file

@ -10,10 +10,9 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"syscall"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
) )
// kick off a server for exposing scan metrics // kick off a server for exposing scan metrics
@ -32,28 +31,12 @@ func main() {
var ( var (
app = flag.String("app", "", "App name") app = flag.String("app", "", "App name")
stream = flag.String("stream", "", "Stream name") stream = flag.String("stream", "", "Stream name")
table = flag.String("table", "", "Checkpoint table name")
) )
flag.Parse() flag.Parse()
// trap SIGINT, wait to trigger shutdown // ddb checkpoint
signals := make(chan os.Signal, 1) ck, err := checkpoint.New(*app, *table)
signal.Notify(signals,
os.Interrupt,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-signals
cancel()
}()
// redis checkpoint
ck, err := checkpoint.New(*app)
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
@ -74,7 +57,19 @@ func main() {
log.Fatalf("consumer error: %v", err) log.Fatalf("consumer error: %v", err)
} }
// start scan // use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
// scan stream
err = c.Scan(ctx, func(r *consumer.Record) bool { err = c.Scan(ctx, func(r *consumer.Record) bool {
fmt.Println(string(r.Data)) fmt.Println(string(r.Data))
return true // continue scanning return true // continue scanning
@ -82,4 +77,8 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) log.Fatalf("scan error: %v", err)
} }
if err := ck.Shutdown(); err != nil {
log.Fatalf("checkpoint shutdown error: %v", err)
}
} }