Add DDB as consumer checkpoint option (#37)

* Simplify the checkpoint interface
* Add DDB backend for checkpoint persistence

Implements: https://github.com/harlow/kinesis-consumer/issues/26
This commit is contained in:
Harlow Ward 2017-11-20 09:37:30 -08:00 committed by GitHub
parent 130c78456c
commit 6ee965ec0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 42 deletions

View file

@ -1,6 +1,8 @@
# Golang Kinesis Consumer # Golang Kinesis Consumer
__Kinesis consumer applications written in Go__ Kinesis consumer applications written in Go
## Note:
> With the new release of Kinesis Firehose I'd recommend using the [kinesis to firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) functionality for writing data directly to S3, Redshift, or Elasticsearch. > With the new release of Kinesis Firehose I'd recommend using the [kinesis to firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) functionality for writing data directly to S3, Redshift, or Elasticsearch.
@ -56,8 +58,10 @@ It also accepts the following optional overrides:
* Checkpoint * Checkpoint
```go ```go
// new kinesis client
svc := kinesis.New(session.New(aws.NewConfig())) svc := kinesis.New(session.New(aws.NewConfig()))
// new consumer with custom client
c, err := consumer.New( c, err := consumer.New(
appName, appName,
streamName, streamName,
@ -73,7 +77,24 @@ The default checkpoint uses Redis on localhost; to set a custom Redis URL use EN
REDIS_URL=redis.example.com:6379 REDIS_URL=redis.example.com:6379
``` ```
* [Add DDB as a checkpoint option](https://github.com/harlow/kinesis-consumer/issues/26) To leverage DynamoDB as the backend for checkpoint we'll need a new table:
Then override the checkpoint config option:
```go
// new ddb checkpoint
ck, err := checkpoint.New(*table, *app, *stream)
if err != nil {
log.Fatalf("new checkpoint error: %v", err)
}
// new consumer with checkpoint
c, err := consumer.New(
appName,
streamName,
consumer.WithCheckpoint(ck),
)
```
### Logging ### Logging

View file

@ -1,9 +1,8 @@
package checkpoint package checkpoint
// Checkpoint interface for functions that checkpoints need to // Checkpoint interface used to allow swappable backends for checkpoining
// implement in order to track consumer progress. // consumer progress in the stream.
type Checkpoint interface { type Checkpoint interface {
CheckpointExists(shardID string) bool Get(shardID string) (string, error)
SequenceNumber() string Set(shardID string, sequenceNumber string) error
SetCheckpoint(shardID string, sequenceNumber string)
} }

115
checkpoint/ddb/ddb.go Normal file
View file

@ -0,0 +1,115 @@
package redis
import (
"fmt"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)
// New returns a checkpoint that uses DynamoDB for underlying storage
func New(tableName, appName, streamName string) (*Checkpoint, error) {
client := dynamodb.New(session.New(aws.NewConfig()))
_, err := client.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err != nil {
return nil, err
}
return &Checkpoint{
TableName: tableName,
AppName: appName,
StreamName: streamName,
client: client,
}, nil
}
// Checkpoint stores and retreives the last evaluated key from a DDB scan
type Checkpoint struct {
AppName string
StreamName string
TableName string
client *dynamodb.DynamoDB
}
type item struct {
ConsumerGroup string `json:"consumer_group"`
ShardID string `json:"shard_id"`
SequenceNumber string `json:"sequence_number"`
}
// Get determines if a checkpoint for a particular Shard exists.
// Typically used to determine whether we should start processing the shard with
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (c *Checkpoint) Get(shardID string) (string, error) {
params := &dynamodb.GetItemInput{
TableName: aws.String(c.TableName),
ConsistentRead: aws.Bool(true),
Key: map[string]*dynamodb.AttributeValue{
"consumer_group": &dynamodb.AttributeValue{
S: aws.String(c.consumerGroupName()),
},
"shard_id": &dynamodb.AttributeValue{
S: aws.String(shardID),
},
},
}
resp, err := c.client.GetItem(params)
if err != nil {
if retriableError(err) {
return c.Get(shardID)
}
return "", err
}
var i item
dynamodbattribute.UnmarshalMap(resp.Item, &i)
return i.SequenceNumber, nil
}
// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point.
func (c *Checkpoint) Set(shardID string, sequenceNumber string) error {
item, err := dynamodbattribute.MarshalMap(item{
ConsumerGroup: c.consumerGroupName(),
ShardID: shardID,
SequenceNumber: sequenceNumber,
})
if err != nil {
log.Printf("marshal map error: %v", err)
return nil
}
_, err = c.client.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(c.TableName),
Item: item,
})
if err != nil {
if !retriableError(err) {
return err
}
return c.Set(shardID, sequenceNumber)
}
return nil
}
func (c *Checkpoint) consumerGroupName() string {
return fmt.Sprintf("%s-%s", c.StreamName, c.AppName)
}
func retriableError(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ProvisionedThroughputExceededException" {
return true
}
}
return false
}

View file

@ -2,7 +2,6 @@ package redis
import ( import (
"fmt" "fmt"
"log"
"os" "os"
redis "gopkg.in/redis.v5" redis "gopkg.in/redis.v5"
@ -10,8 +9,8 @@ import (
const localhost = "127.0.0.1:6379" const localhost = "127.0.0.1:6379"
// NewCheckpoint returns a checkpoint that uses Redis for underlying storage // New returns a checkpoint that uses Redis for underlying storage
func NewCheckpoint(appName, streamName string) (*Checkpoint, error) { func New(appName, streamName string) (*Checkpoint, error) {
addr := os.Getenv("REDIS_URL") addr := os.Getenv("REDIS_URL")
if addr == "" { if addr == "" {
addr = localhost addr = localhost
@ -32,44 +31,29 @@ func NewCheckpoint(appName, streamName string) (*Checkpoint, error) {
}, nil }, nil
} }
// Checkpoint implements the Checkpont interface. // Checkpoint stores and retreives the last evaluated key from a DDB scan
// Used to enable the Pipeline.ProcessShard to checkpoint it's progress
// while reading records from Kinesis stream.
type Checkpoint struct { type Checkpoint struct {
AppName string AppName string
StreamName string StreamName string
client *redis.Client client *redis.Client
sequenceNumber string
} }
// CheckpointExists determines if a checkpoint for a particular Shard exists. // Get determines if a checkpoint for a particular Shard exists.
// Typically used to determine whether we should start processing the shard with // Typically used to determine whether we should start processing the shard with
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (c *Checkpoint) CheckpointExists(shardID string) bool { func (c *Checkpoint) Get(shardID string) (string, error) {
val, _ := c.client.Get(c.key(shardID)).Result() return c.client.Get(c.key(shardID)).Result()
if val != "" {
c.sequenceNumber = val
return true
} }
return false // Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
}
// SequenceNumber returns the current checkpoint stored for the specified shard.
func (c *Checkpoint) SequenceNumber() string {
return c.sequenceNumber
}
// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point. // Upon failover, record processing is resumed from this point.
func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) { func (c *Checkpoint) Set(shardID string, sequenceNumber string) error {
err := c.client.Set(c.key(shardID), sequenceNumber, 0).Err() err := c.client.Set(c.key(shardID), sequenceNumber, 0).Err()
if err != nil { if err != nil {
log.Printf("redis checkpoint set error: %v", err) return fmt.Errorf("redis checkpoint error: %v", err)
} }
c.sequenceNumber = sequenceNumber return nil
} }
// key generates a unique Redis key for storage of Checkpoint. // key generates a unique Redis key for storage of Checkpoint.

View file

@ -78,9 +78,9 @@ func New(appName, streamName string, opts ...Option) (*Consumer, error) {
c.svc = kinesis.New(session.New(aws.NewConfig())) c.svc = kinesis.New(session.New(aws.NewConfig()))
} }
// provide default checkpoint // provide default Redis checkpoint
if c.checkpoint == nil { if c.checkpoint == nil {
ck, err := redis.NewCheckpoint(appName, streamName) ck, err := redis.New(appName, streamName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -182,12 +182,15 @@ loop:
} }
logger.WithField("records", len(resp.Records)).Info("checkpoint") logger.WithField("records", len(resp.Records)).Info("checkpoint")
c.checkpoint.SetCheckpoint(shardID, sequenceNumber) if err := c.checkpoint.Set(shardID, sequenceNumber); err != nil {
c.logger.WithError(err).Error("set checkpoint error")
}
} }
if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator { if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator {
shardIterator, err = c.getShardIterator(shardID) shardIterator, err = c.getShardIterator(shardID)
if err != nil { if err != nil {
logger.WithError(err).Error("getShardIterator")
break loop break loop
} }
} else { } else {
@ -197,7 +200,9 @@ loop:
} }
if sequenceNumber != "" { if sequenceNumber != "" {
c.checkpoint.SetCheckpoint(shardID, sequenceNumber) if err := c.checkpoint.Set(shardID, sequenceNumber); err != nil {
c.logger.WithError(err).Error("set checkpoint error")
}
} }
} }
@ -207,9 +212,14 @@ func (c *Consumer) getShardIterator(shardID string) (*string, error) {
StreamName: aws.String(c.streamName), StreamName: aws.String(c.streamName),
} }
if c.checkpoint.CheckpointExists(shardID) { seqNum, err := c.checkpoint.Get(shardID)
if err != nil {
return nil, err
}
if seqNum != "" {
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
params.StartingSequenceNumber = aws.String(c.checkpoint.SequenceNumber()) params.StartingSequenceNumber = aws.String(seqNum)
} else { } else {
params.ShardIteratorType = aws.String("TRIM_HORIZON") params.ShardIteratorType = aws.String("TRIM_HORIZON")
} }

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"flag" "flag"
"fmt"
"os" "os"
"github.com/apex/log" "github.com/apex/log"
@ -27,8 +28,7 @@ func main() {
} }
c.Scan(context.TODO(), func(r *kinesis.Record) bool { c.Scan(context.TODO(), func(r *kinesis.Record) bool {
// fmt.Println(string(r.Data)) fmt.Println(string(r.Data))
return true // continue scanning return true // continue scanning
}) })
} }