From 0d5e9b7b0208a4048810168746a9b621f978f047 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 16 Aug 2015 17:52:10 -0700 Subject: [PATCH] Use AWS SDK for S3 Emitter * Handle retries for S3 Emitter * Update Pipeline to use AWS backoff * Leverage aws error kit in recoverable error helpers --- awsbackoff.go | 77 ++++++++++++++++++------------------ examples/seed-stream/main.go | 12 +----- pipeline.go | 41 +++++++++++++------ redshift_basic_emitter.go | 2 +- s3_emitter.go | 34 ++++++++++------ 5 files changed, 91 insertions(+), 75 deletions(-) diff --git a/awsbackoff.go b/awsbackoff.go index 72efd4e..79e544b 100644 --- a/awsbackoff.go +++ b/awsbackoff.go @@ -4,27 +4,57 @@ import ( "math" "net" "net/url" - "reflect" "regexp" "time" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/lib/pq" - "github.com/sendgridlabs/go-kinesis" ) type isRecoverableErrorFunc func(error) bool +var isRecoverableErrors = []isRecoverableErrorFunc{ + kinesisIsRecoverableError, + netIsRecoverableError, + redshiftIsRecoverableError, + urlIsRecoverableError, +} + +// isRecoverableError determines whether the error is recoverable +func isRecoverableError(err error) bool { + for _, errF := range isRecoverableErrors { + if errF(err) { + return true + } + } + return false +} + +// handle the aws exponential backoff +// wait up to 5 minutes based on the aws exponential backoff algorithm +// http://docs.aws.amazon.com/general/latest/gr/api-retries.html +func handleAwsWaitTimeExp(attempts int) { + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + time.Sleep(waitTime) + } +} + func kinesisIsRecoverableError(err error) bool { recoverableErrorCodes := map[string]bool{ "InternalFailure": true, "ProvisionedThroughputExceededException": true, + "RequestError": true, "ServiceUnavailable": true, "Throttling": true, } - cErr, ok := err.(*kinesis.Error) - if ok && recoverableErrorCodes[cErr.Code] == true { - return true + + if err, ok := err.(awserr.Error); ok { + if ok && recoverableErrorCodes[err.Code()] == true { + return true + } } + return false } @@ -47,11 +77,11 @@ func netIsRecoverableError(err error) bool { return false } -var redshiftRecoverableErrors = []*regexp.Regexp{ - regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), -} - func redshiftIsRecoverableError(err error) bool { + redshiftRecoverableErrors := []*regexp.Regexp{ + regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), + } + if cErr, ok := err.(pq.Error); ok { for _, re := range redshiftRecoverableErrors { if re.MatchString(cErr.Message) { @@ -61,32 +91,3 @@ func redshiftIsRecoverableError(err error) bool { } return false } - -var isRecoverableErrors = []isRecoverableErrorFunc{ - kinesisIsRecoverableError, - netIsRecoverableError, - redshiftIsRecoverableError, - urlIsRecoverableError, -} - -// this determines whether the error is recoverable -func isRecoverableError(err error) bool { - logger.Log("info", "isRecoverableError", "type", reflect.TypeOf(err).String(), "msg", err.Error()) - for _, errF := range isRecoverableErrors { - if errF(err) { - return true - } - } - return false -} - -// handle the aws exponential backoff -// wait up to 5 minutes based on the aws exponential backoff algorithm -// http://docs.aws.amazon.com/general/latest/gr/api-retries.html -func handleAwsWaitTimeExp(attempts int) { - if attempts > 0 { - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - logger.Log("info", "handleAwsWaitTimeExp", "attempts", attempts, "waitTime", waitTime.String()) - time.Sleep(waitTime) - } -} diff --git a/examples/seed-stream/main.go b/examples/seed-stream/main.go index 1c29888..03022c1 100644 --- a/examples/seed-stream/main.go +++ b/examples/seed-stream/main.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "os" - "sync" "github.com/harlow/kinesis-connectors" "github.com/joho/godotenv" @@ -30,7 +29,6 @@ func main() { args := kinesis.NewArgs() args.Add("StreamName", "userStream") ctr := 0 - var wg sync.WaitGroup for scanner.Scan() { l := scanner.Text() @@ -40,16 +38,10 @@ func main() { args := kinesis.NewArgs() args.Add("StreamName", "userStream") args.AddRecord([]byte(l), key) - wg.Add(1) - - go func() { - ksis.PutRecords(args) - fmt.Print(".") - wg.Done() - }() + ksis.PutRecords(args) + fmt.Print(".") } - wg.Wait() fmt.Println(".") fmt.Println("Finished populating userStream") } diff --git a/pipeline.go b/pipeline.go index 6970564..3f6a65f 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,6 +1,7 @@ package connector import ( + "os" "time" "github.com/aws/aws-sdk-go/aws" @@ -45,30 +46,43 @@ func (p Pipeline) ProcessShard(shardID string) { if err != nil { if awsErr, ok := err.(awserr.Error); ok { - logger.Log("error", "GetShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) - return + logger.Log("fatal", "getShardIterator", "code", awsErr.Code(), "msg", awsErr.Message(), "origError", awsErr.OrigErr()) + os.Exit(1) } } + errorCount := 0 shardIterator := resp.ShardIterator for { + // exit program if error threshold is reached + if errorCount > 50 { + logger.Log("fatal", "getRecords", "msg", "Too many consecutive error attempts") + os.Exit(1) + } + + // get records from stream args := &kinesis.GetRecordsInput{ShardIterator: shardIterator} resp, err := svc.GetRecords(args) + // handle recoverable errors, else exit program if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "ProvisionedThroughputExceededException" { - logger.Log("info", "GetRecords", "shardId", shardID, "msg", "rateLimit") - time.Sleep(5 * time.Second) - continue - } else { - logger.Log("error", "GetRecords", "shardId", shardID, "code", awsErr.Code(), "msg", awsErr.Message()) - break - } + awsErr, _ := err.(awserr.Error) + errorCount++ + + if isRecoverableError(err) { + logger.Log("warn", "getRecords", "errorCount", errorCount, "code", awsErr.Code()) + handleAwsWaitTimeExp(errorCount) + continue + } else { + logger.Log("fatal", "getRecords", awsErr.Code()) + os.Exit(1) } + } else { + errorCount = 0 } + // process records if len(resp.Records) > 0 { for _, r := range resp.Records { transformedRecord := p.Transformer.ToRecord(r.Data) @@ -82,12 +96,13 @@ func (p Pipeline) ProcessShard(shardID string) { if p.Buffer.ShouldFlush() { p.Emitter.Emit(p.Buffer, p.Transformer) + logger.Log("info", "emit", "shardID", shardID, "recordsEmitted", len(p.Buffer.Records())) p.Checkpoint.SetCheckpoint(shardID, p.checkpointSequenceNumber) p.Buffer.Flush() } } else if resp.NextShardIterator == aws.String("") || shardIterator == resp.NextShardIterator { - logger.Log("error", "NextShardIterator", "msg", err.Error()) - break + logger.Log("fatal", "nextShardIterator", "msg", err.Error()) + os.Exit(1) } else { time.Sleep(1 * time.Second) } diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 28be827..2746abf 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -37,7 +37,7 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { // db command succeeded, break from loop if err == nil { - logger.Log("info", "RedshiftBasicEmitter", "shard", shardID, "file", s3File) + logger.Log("info", "RedshiftBasicEmitter", "file", s3File) break } diff --git a/s3_emitter.go b/s3_emitter.go index 823a134..858c8c7 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,11 +3,12 @@ package connector import ( "bytes" "fmt" - "os" "time" - "github.com/crowdmob/goamz/aws" - "github.com/crowdmob/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "gopkg.in/matryer/try.v1" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. @@ -23,25 +24,32 @@ type S3Emitter struct { // Emit is invoked when the buffer is full. This method emits the set of filtered records. func (e S3Emitter) Emit(b Buffer, t Transformer) { - auth, _ := aws.EnvAuth() - s3Con := s3.New(auth, aws.USEast) - bucket := s3Con.Bucket(e.S3Bucket) - s3File := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - var buffer bytes.Buffer + svc := s3.New(&aws.Config{Region: "us-east-1"}) + key := e.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) for _, r := range b.Records() { var s = t.FromRecord(r) buffer.Write(s) } - err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) + params := &s3.PutObjectInput{ + Body: bytes.NewReader(buffer.Bytes()), + Bucket: aws.String(e.S3Bucket), + ContentType: aws.String("text/plain"), + Key: aws.String(key), + } + + err := try.Do(func(attempt int) (bool, error) { + var err error + _, err = svc.PutObject(params) + return attempt < 5, err + }) if err != nil { - logger.Log("error", "S3Put", "msg", err.Error()) - os.Exit(1) - } else { - logger.Log("info", "S3Put", "recordsEmitted", len(b.Records())) + if awsErr, ok := err.(awserr.Error); ok { + logger.Log("error", "emit", "code", awsErr.Code()) + } } }