add recoverability in the pipeline, especially for throughput errors

This commit is contained in:
dan 2015-04-03 15:33:34 -07:00
parent c5c7dd49c9
commit 797b575ad1
3 changed files with 43 additions and 5 deletions

View file

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/sendgridlabs/go-kinesis" "github.com/ezoic/go-kinesis"
) )
// CreateStream creates a new Kinesis stream (uses existing stream if exists) and // CreateStream creates a new Kinesis stream (uses existing stream if exists) and

View file

@ -2,9 +2,12 @@ package connector
import ( import (
"log" "log"
"math"
"os"
"time" "time"
"github.com/sendgridlabs/go-kinesis" "github.com/ezoic/go-kinesis"
l4g "github.com/ezoic/sol/log4go"
) )
// Pipeline is used as a record processor to configure a pipline. // Pipeline is used as a record processor to configure a pipline.
@ -21,6 +24,28 @@ type Pipeline struct {
Transformer Transformer Transformer Transformer
} }
// this determines whether the error is recoverable
func (p Pipeline) isRecoverableError(err error) bool {
r := false
cErr, ok := err.(*kinesis.Error)
if ok && cErr.Code == "ProvisionedThroughputExceeded" {
r = true
}
return true
}
// handle the aws exponential backoff
func (p Pipeline) handleAwsWaitTimeExp(attempts int) {
//http://docs.aws.amazon.com/general/latest/gr/api-retries.html
// wait up to 5 minutes based on the aws exponential backoff algorithm
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
time.Sleep(waitTime)
}
// ProcessShard kicks off the process of a Kinesis Shard. // ProcessShard kicks off the process of a Kinesis Shard.
// It is a long running process that will continue to read from the shard. // It is a long running process that will continue to read from the shard.
func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
@ -43,13 +68,26 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
shardIterator := shardInfo.ShardIterator shardIterator := shardInfo.ShardIterator
consecutiveErrorAttempts := 0
for { for {
// handle the aws backoff stuff
p.handleAwsWaitTimeExp(consecutiveErrorAttempts)
args = kinesis.NewArgs() args = kinesis.NewArgs()
args.Add("ShardIterator", shardIterator) args.Add("ShardIterator", shardIterator)
recordSet, err := ksis.GetRecords(args) recordSet, err := ksis.GetRecords(args)
if err != nil { if err != nil {
log.Fatalf("GetRecords ERROR: %v\n", err) if p.isRecoverableError(err) {
consecutiveErrorAttempts++
} else {
l4g.Critical("GetRecords ERROR: %v\n", err)
os.Exit(1)
}
} else {
consecutiveErrorAttempts = 0
} }
if len(recordSet.Records) > 0 { if len(recordSet.Records) > 0 {
@ -57,7 +95,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
data := v.GetData() data := v.GetData()
if err != nil { if err != nil {
log.Printf("GetData ERROR: %v\n", err) l4g.Error("GetData ERROR: %v\n", err)
continue continue
} }

View file

@ -3,7 +3,7 @@ package connector
import ( import (
"log" "log"
"github.com/sendgridlabs/go-kinesis" "github.com/ezoic/go-kinesis"
) )
// An implementation of Emitter that puts event data on S3 file, and then puts the // An implementation of Emitter that puts event data on S3 file, and then puts the