diff --git a/README.md b/README.md index fdb61ea..7a2a246 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # Golang Kinesis Consumer -__Kinesis consumer applications written in Go__ +Kinesis consumer applications written in Go + +## Note: > With the new release of Kinesis Firehose I'd recommend using the [kinesis to firehose](http://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html) functionality for writing data directly to S3, Redshift, or Elasticsearch. @@ -56,8 +58,10 @@ It also accepts the following optional overrides: * Checkpoint ```go +// new kinesis client svc := kinesis.New(session.New(aws.NewConfig())) +// new consumer with custom client c, err := consumer.New( appName, streamName, @@ -73,7 +77,24 @@ The default checkpoint uses Redis on localhost; to set a custom Redis URL use EN REDIS_URL=redis.example.com:6379 ``` -* [Add DDB as a checkpoint option](https://github.com/harlow/kinesis-consumer/issues/26) +To leverage DynamoDB as the backend for checkpoint we'll need a new table: + +Then override the checkpoint config option: + +```go +// new ddb checkpoint +ck, err := checkpoint.New(*table, *app, *stream) +if err != nil { + log.Fatalf("new checkpoint error: %v", err) +} + +// new consumer with checkpoint +c, err := consumer.New( + appName, + streamName, + consumer.WithCheckpoint(ck), +) +``` ### Logging diff --git a/checkpoint/checkpoint.go b/checkpoint/checkpoint.go index 04b447e..89af444 100644 --- a/checkpoint/checkpoint.go +++ b/checkpoint/checkpoint.go @@ -1,9 +1,8 @@ package checkpoint -// Checkpoint interface for functions that checkpoints need to -// implement in order to track consumer progress. +// Checkpoint interface used to allow swappable backends for checkpoining +// consumer progress in the stream. type Checkpoint interface { - CheckpointExists(shardID string) bool - SequenceNumber() string - SetCheckpoint(shardID string, sequenceNumber string) + Get(shardID string) (string, error) + Set(shardID string, sequenceNumber string) error } diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go new file mode 100644 index 0000000..d7628d6 --- /dev/null +++ b/checkpoint/ddb/ddb.go @@ -0,0 +1,115 @@ +package redis + +import ( + "fmt" + "log" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" +) + +// New returns a checkpoint that uses DynamoDB for underlying storage +func New(tableName, appName, streamName string) (*Checkpoint, error) { + client := dynamodb.New(session.New(aws.NewConfig())) + + _, err := client.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + }) + if err != nil { + return nil, err + } + + return &Checkpoint{ + TableName: tableName, + AppName: appName, + StreamName: streamName, + client: client, + }, nil +} + +// Checkpoint stores and retreives the last evaluated key from a DDB scan +type Checkpoint struct { + AppName string + StreamName string + TableName string + + client *dynamodb.DynamoDB +} + +type item struct { + ConsumerGroup string `json:"consumer_group"` + ShardID string `json:"shard_id"` + SequenceNumber string `json:"sequence_number"` +} + +// Get determines if a checkpoint for a particular Shard exists. +// Typically used to determine whether we should start processing the shard with +// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). +func (c *Checkpoint) Get(shardID string) (string, error) { + params := &dynamodb.GetItemInput{ + TableName: aws.String(c.TableName), + ConsistentRead: aws.Bool(true), + Key: map[string]*dynamodb.AttributeValue{ + "consumer_group": &dynamodb.AttributeValue{ + S: aws.String(c.consumerGroupName()), + }, + "shard_id": &dynamodb.AttributeValue{ + S: aws.String(shardID), + }, + }, + } + + resp, err := c.client.GetItem(params) + if err != nil { + if retriableError(err) { + return c.Get(shardID) + } + return "", err + } + + var i item + dynamodbattribute.UnmarshalMap(resp.Item, &i) + return i.SequenceNumber, nil +} + +// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Upon failover, record processing is resumed from this point. +func (c *Checkpoint) Set(shardID string, sequenceNumber string) error { + item, err := dynamodbattribute.MarshalMap(item{ + ConsumerGroup: c.consumerGroupName(), + ShardID: shardID, + SequenceNumber: sequenceNumber, + }) + if err != nil { + log.Printf("marshal map error: %v", err) + return nil + } + + _, err = c.client.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(c.TableName), + Item: item, + }) + if err != nil { + if !retriableError(err) { + return err + } + return c.Set(shardID, sequenceNumber) + } + return nil +} + +func (c *Checkpoint) consumerGroupName() string { + return fmt.Sprintf("%s-%s", c.StreamName, c.AppName) +} + +func retriableError(err error) bool { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ProvisionedThroughputExceededException" { + return true + } + } + return false +} diff --git a/checkpoint/redis/redis.go b/checkpoint/redis/redis.go index e88b963..c8a3b36 100644 --- a/checkpoint/redis/redis.go +++ b/checkpoint/redis/redis.go @@ -2,7 +2,6 @@ package redis import ( "fmt" - "log" "os" redis "gopkg.in/redis.v5" @@ -10,8 +9,8 @@ import ( const localhost = "127.0.0.1:6379" -// NewCheckpoint returns a checkpoint that uses Redis for underlying storage -func NewCheckpoint(appName, streamName string) (*Checkpoint, error) { +// New returns a checkpoint that uses Redis for underlying storage +func New(appName, streamName string) (*Checkpoint, error) { addr := os.Getenv("REDIS_URL") if addr == "" { addr = localhost @@ -32,44 +31,29 @@ func NewCheckpoint(appName, streamName string) (*Checkpoint, error) { }, nil } -// Checkpoint implements the Checkpont interface. -// Used to enable the Pipeline.ProcessShard to checkpoint it's progress -// while reading records from Kinesis stream. +// Checkpoint stores and retreives the last evaluated key from a DDB scan type Checkpoint struct { AppName string StreamName string - client *redis.Client - sequenceNumber string + client *redis.Client } -// CheckpointExists determines if a checkpoint for a particular Shard exists. +// Get determines if a checkpoint for a particular Shard exists. // Typically used to determine whether we should start processing the shard with // TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists). -func (c *Checkpoint) CheckpointExists(shardID string) bool { - val, _ := c.client.Get(c.key(shardID)).Result() - - if val != "" { - c.sequenceNumber = val - return true - } - - return false +func (c *Checkpoint) Get(shardID string) (string, error) { + return c.client.Get(c.key(shardID)).Result() } -// SequenceNumber returns the current checkpoint stored for the specified shard. -func (c *Checkpoint) SequenceNumber() string { - return c.sequenceNumber -} - -// SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). +// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application). // Upon failover, record processing is resumed from this point. -func (c *Checkpoint) SetCheckpoint(shardID string, sequenceNumber string) { +func (c *Checkpoint) Set(shardID string, sequenceNumber string) error { err := c.client.Set(c.key(shardID), sequenceNumber, 0).Err() if err != nil { - log.Printf("redis checkpoint set error: %v", err) + return fmt.Errorf("redis checkpoint error: %v", err) } - c.sequenceNumber = sequenceNumber + return nil } // key generates a unique Redis key for storage of Checkpoint. diff --git a/consumer.go b/consumer.go index 27bf269..1dcae1f 100644 --- a/consumer.go +++ b/consumer.go @@ -78,9 +78,9 @@ func New(appName, streamName string, opts ...Option) (*Consumer, error) { c.svc = kinesis.New(session.New(aws.NewConfig())) } - // provide default checkpoint + // provide default Redis checkpoint if c.checkpoint == nil { - ck, err := redis.NewCheckpoint(appName, streamName) + ck, err := redis.New(appName, streamName) if err != nil { return nil, err } @@ -182,12 +182,15 @@ loop: } logger.WithField("records", len(resp.Records)).Info("checkpoint") - c.checkpoint.SetCheckpoint(shardID, sequenceNumber) + if err := c.checkpoint.Set(shardID, sequenceNumber); err != nil { + c.logger.WithError(err).Error("set checkpoint error") + } } if resp.NextShardIterator == nil || shardIterator == resp.NextShardIterator { shardIterator, err = c.getShardIterator(shardID) if err != nil { + logger.WithError(err).Error("getShardIterator") break loop } } else { @@ -197,7 +200,9 @@ loop: } if sequenceNumber != "" { - c.checkpoint.SetCheckpoint(shardID, sequenceNumber) + if err := c.checkpoint.Set(shardID, sequenceNumber); err != nil { + c.logger.WithError(err).Error("set checkpoint error") + } } } @@ -207,9 +212,14 @@ func (c *Consumer) getShardIterator(shardID string) (*string, error) { StreamName: aws.String(c.streamName), } - if c.checkpoint.CheckpointExists(shardID) { + seqNum, err := c.checkpoint.Get(shardID) + if err != nil { + return nil, err + } + + if seqNum != "" { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") - params.StartingSequenceNumber = aws.String(c.checkpoint.SequenceNumber()) + params.StartingSequenceNumber = aws.String(seqNum) } else { params.ShardIteratorType = aws.String("TRIM_HORIZON") } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 3e3073c..9c54423 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" "os" "github.com/apex/log" @@ -27,8 +28,7 @@ func main() { } c.Scan(context.TODO(), func(r *kinesis.Record) bool { - // fmt.Println(string(r.Data)) - + fmt.Println(string(r.Data)) return true // continue scanning }) }