Use AWS SDK for S3 Emitter

* Handle retries for S3 Emitter
* Update Pipeline to use AWS backoff
* Leverage aws error kit in recoverable error helpers
This commit is contained in:
Harlow Ward 2015-08-16 17:52:10 -07:00
parent 18173842fb
commit 0d5e9b7b02
5 changed files with 91 additions and 75 deletions

View file

@ -4,27 +4,57 @@ import (
"math" "math"
"net" "net"
"net/url" "net/url"
"reflect"
"regexp" "regexp"
"time" "time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/sendgridlabs/go-kinesis"
) )
type isRecoverableErrorFunc func(error) bool type isRecoverableErrorFunc func(error) bool
var isRecoverableErrors = []isRecoverableErrorFunc{
kinesisIsRecoverableError,
netIsRecoverableError,
redshiftIsRecoverableError,
urlIsRecoverableError,
}
// isRecoverableError determines whether the error is recoverable
func isRecoverableError(err error) bool {
for _, errF := range isRecoverableErrors {
if errF(err) {
return true
}
}
return false
}
// handle the aws exponential backoff
// wait up to 5 minutes based on the aws exponential backoff algorithm
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
func handleAwsWaitTimeExp(attempts int) {
if attempts > 0 {
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
time.Sleep(waitTime)
}
}
func kinesisIsRecoverableError(err error) bool { func kinesisIsRecoverableError(err error) bool {
recoverableErrorCodes := map[string]bool{ recoverableErrorCodes := map[string]bool{
"InternalFailure": true, "InternalFailure": true,
"ProvisionedThroughputExceededException": true, "ProvisionedThroughputExceededException": true,
"RequestError": true,
"ServiceUnavailable": true, "ServiceUnavailable": true,
"Throttling": true, "Throttling": true,
} }
cErr, ok := err.(*kinesis.Error)
if ok && recoverableErrorCodes[cErr.Code] == true { if err, ok := err.(awserr.Error); ok {
return true if ok && recoverableErrorCodes[err.Code()] == true {
return true
}
} }
return false return false
} }
@ -47,11 +77,11 @@ func netIsRecoverableError(err error) bool {
return false return false
} }
var redshiftRecoverableErrors = []*regexp.Regexp{
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
}
func redshiftIsRecoverableError(err error) bool { func redshiftIsRecoverableError(err error) bool {
redshiftRecoverableErrors := []*regexp.Regexp{
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
}
if cErr, ok := err.(pq.Error); ok { if cErr, ok := err.(pq.Error); ok {
for _, re := range redshiftRecoverableErrors { for _, re := range redshiftRecoverableErrors {
if re.MatchString(cErr.Message) { if re.MatchString(cErr.Message) {
@ -61,32 +91,3 @@ func redshiftIsRecoverableError(err error) bool {
} }
return false return false
} }
var isRecoverableErrors = []isRecoverableErrorFunc{
kinesisIsRecoverableError,
netIsRecoverableError,
redshiftIsRecoverableError,
urlIsRecoverableError,
}
// this determines whether the error is recoverable
func isRecoverableError(err error) bool {
logger.Log("info", "isRecoverableError", "type", reflect.TypeOf(err).String(), "msg", err.Error())
for _, errF := range isRecoverableErrors {
if errF(err) {
return true
}
}
return false
}
// handle the aws exponential backoff
// wait up to 5 minutes based on the aws exponential backoff algorithm
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
func handleAwsWaitTimeExp(attempts int) {
if attempts > 0 {
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
logger.Log("info", "handleAwsWaitTimeExp", "attempts", attempts, "waitTime", waitTime.String())
time.Sleep(waitTime)
}
}

View file

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"os" "os"
"sync"
"github.com/harlow/kinesis-connectors" "github.com/harlow/kinesis-connectors"
"github.com/joho/godotenv" "github.com/joho/godotenv"
@ -30,7 +29,6 @@ func main() {
args := kinesis.NewArgs() args := kinesis.NewArgs()
args.Add("StreamName", "userStream") args.Add("StreamName", "userStream")
ctr := 0 ctr := 0
var wg sync.WaitGroup
for scanner.Scan() { for scanner.Scan() {
l := scanner.Text() l := scanner.Text()
@ -40,16 +38,10 @@ func main() {
args := kinesis.NewArgs() args := kinesis.NewArgs()
args.Add("StreamName", "userStream") args.Add("StreamName", "userStream")
args.AddRecord([]byte(l), key) args.AddRecord([]byte(l), key)
wg.Add(1) ksis.PutRecords(args)
fmt.Print(".")
go func() {
ksis.PutRecords(args)
fmt.Print(".")
wg.Done()
}()
} }
wg.Wait()
fmt.Println(".") fmt.Println(".")
fmt.Println("Finished populating userStream") fmt.Println("Finished populating userStream")
} }

View file

@ -1,6 +1,7 @@
package connector package connector
import ( import (
"os"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -45,30 +46,43 @@ func (p Pipeline) ProcessShard(shardID string) {
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
logger.Log("error", "GetShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr())
return os.Exit(1)
} }
} }
errorCount := 0
shardIterator := resp.ShardIterator shardIterator := resp.ShardIterator
for { for {
// exit program if error threshold is reached
if errorCount > 50 {
logger.Log("fatal", "getRecords", "msg", "Too many consecutive error attempts")
os.Exit(1)
}
// get records from stream
args := &kinesis.GetRecordsInput{ShardIterator: shardIterator} args := &kinesis.GetRecordsInput{ShardIterator: shardIterator}
resp, err := svc.GetRecords(args) resp, err := svc.GetRecords(args)
// handle recoverable errors, else exit program
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { awsErr, _ := err.(awserr.Error)
if awsErr.Code() == "ProvisionedThroughputExceededException" { errorCount++
logger.Log("info", "GetRecords", "shardId", shardID, "msg", "rateLimit")
time.Sleep(5 * time.Second) if isRecoverableError(err) {
continue logger.Log("warn", "getRecords", "errorCount", errorCount, "code", awsErr.Code())
} else { handleAwsWaitTimeExp(errorCount)
logger.Log("error", "GetRecords", "shardId", shardID, "code", awsErr.Code(), "msg", awsErr.Message()) continue
break } else {
} logger.Log("fatal", "getRecords", awsErr.Code())
os.Exit(1)
} }
} else {
errorCount = 0
} }
// process records
if len(resp.Records) > 0 { if len(resp.Records) > 0 {
for _, r := range resp.Records { for _, r := range resp.Records {
transformedRecord := p.Transformer.ToRecord(r.Data) transformedRecord := p.Transformer.ToRecord(r.Data)
@ -82,12 +96,13 @@ func (p Pipeline) ProcessShard(shardID string) {
if p.Buffer.ShouldFlush() { if p.Buffer.ShouldFlush() {
p.Emitter.Emit(p.Buffer, p.Transformer) p.Emitter.Emit(p.Buffer, p.Transformer)
logger.Log("info", "emit", "shardID", shardID, "recordsEmitted", len(p.Buffer.Records()))
p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber) p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber)
p.Buffer.Flush() p.Buffer.Flush()
} }
} else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator {
logger.Log("error", "NextShardIterator", "msg", err.Error()) logger.Log("fatal", "nextShardIterator", "msg", err.Error())
break os.Exit(1)
} else { } else {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }

View file

@ -37,7 +37,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
// db command succeeded, break from loop // db command succeeded, break from loop
if err == nil { if err == nil {
logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File) logger.Log("info", "RedshiftBasicEmitter", "file", s3File)
break break
} }

View file

@ -3,11 +3,12 @@ package connector
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"os"
"time" "time"
"github.com/crowdmob/goamz/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/crowdmob/goamz/s3" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"gopkg.in/matryer/try.v1"
) )
// S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3.
@ -23,25 +24,32 @@ type S3Emitter struct {
// Emit is invoked when the buffer is full. This method emits the set of filtered records. // Emit is invoked when the buffer is full. This method emits the set of filtered records.
func (e S3Emitter) Emit(b Buffer, t Transformer) { func (e S3Emitter) Emit(b Buffer, t Transformer) {
auth, _ := aws.EnvAuth()
s3Con := s3.New(auth, aws.USEast)
bucket := s3Con.Bucket(e.S3Bucket)
s3File := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
var buffer bytes.Buffer var buffer bytes.Buffer
svc := s3.New(&aws.Config{Region: "us-east-1"})
key := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
for _, r := range b.Records() { for _, r := range b.Records() {
var s = t.FromRecord(r) var s = t.FromRecord(r)
buffer.Write(s) buffer.Write(s)
} }
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) params := &s3.PutObjectInput{
Body: bytes.NewReader(buffer.Bytes()),
Bucket: aws.String(e.S3Bucket),
ContentType: aws.String("text/plain"),
Key: aws.String(key),
}
err := try.Do(func(attempt int) (bool, error) {
var err error
_, err = svc.PutObject(params)
return attempt < 5, err
})
if err != nil { if err != nil {
logger.Log("error", "S3Put", "msg", err.Error()) if awsErr, ok := err.(awserr.Error); ok {
os.Exit(1) logger.Log("error", "emit", "code", awsErr.Code())
} else { }
logger.Log("info", "S3Put", "recordsEmitted", len(b.Records()))
} }
} }