From 8c660f79fb08b3543cd93bfe4a6e26052425bc8f Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 6 May 2015 09:14:00 -0700 Subject: [PATCH] Add Retries to Redshift Basic Emitter * Move AWS exponential backoff to its own file --- awsbackoff.go | 98 +++++++++++++++++++++++++++++++++++++++ pipeline.go | 27 +++-------- redshift_basic_emitter.go | 15 +++++- 3 files changed, 117 insertions(+), 23 deletions(-) create mode 100644 awsbackoff.go diff --git a/awsbackoff.go b/awsbackoff.go new file mode 100644 index 0000000..010b0bc --- /dev/null +++ b/awsbackoff.go @@ -0,0 +1,98 @@ +package connector + +import ( + "math" + "net" + "net/url" + "reflect" + "regexp" + "time" + + "github.com/lib/pq" + "github.com/sendgridlabs/go-kinesis" +) + +type isRecoverableErrorFunc func(error) bool + +func kinesisIsRecoverableError(err error) bool { + recoverableErrorCodes := map[string]bool{ + "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + } + r := false + cErr, ok := err.(*kinesis.Error) + if ok && recoverableErrorCodes[cErr.Code] == true { + r = true + } + return r +} + +func urlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + +func netIsRecoverableError(err error) bool { + recoverableErrors := map[string]bool{ + "connection reset by peer": true, + } + cErr, ok := err.(*net.OpError) + if ok && recoverableErrors[cErr.Err.Error()] == true { + return true + } + return false +} + +var redshiftRecoverableErrors = []*regexp.Regexp{ + regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), +} + +func redshiftIsRecoverableError(err error) bool { + r := false + if cErr, ok := err.(pq.Error); ok { + for _, re := range redshiftRecoverableErrors { + if re.MatchString(cErr.Message) { + r = true + break + } + } + } + return r +} + +var isRecoverableErrors = []isRecoverableErrorFunc{ + kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError, +} + +// this determines whether the error is recoverable +func isRecoverableError(err error) bool { + r := false + + logger.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) + + for _, errF := range isRecoverableErrors { + r = errF(err) + if r { + break + } + } + + return r +} + +// handle the aws exponential backoff +// wait up to 5 minutes based on the aws exponential backoff algorithm +// http://docs.aws.amazon.com/general/latest/gr/api-retries.html +func handleAwsWaitTimeExp(attempts int) { + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + logger.Printf("handleAwsWaitTimeExp: %s\n", waitTime.String()) + time.Sleep(waitTime) + } +} diff --git a/pipeline.go b/pipeline.go index 04ba3f9..0cd9476 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,7 +1,6 @@ package connector import ( - "math" "time" "github.com/sendgridlabs/go-kinesis" @@ -21,24 +20,6 @@ type Pipeline struct { Transformer Transformer } -// determine whether the error is recoverable -func (p Pipeline) isRecoverableError(err error) bool { - cErr, ok := err.(*kinesis.Error) - if ok && cErr.Code == "ProvisionedThroughputExceeded" { - return true - } - return false -} - -// handle the aws exponential backoff -// http://docs.aws.amazon.com/general/latest/gr/api-retries.html -func (p Pipeline) handleAwsWaitTimeExp(attempts int) { - // wait up to 5 minutes based on the aws exponential backoff algorithm - logger.Printf("waitingnow") - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - time.Sleep(waitTime) -} - // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -63,15 +44,19 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { consecutiveErrorAttempts := 0 for { + if consecutiveErrorAttempts > 50 { + logger.Fatalf("Too many consecutive error attempts") + } + args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - if p.isRecoverableError(err) { + if isRecoverableError(err) { logger.Printf("GetRecords RECOVERABLE_ERROR: %v\n", err) consecutiveErrorAttempts++ - p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + handleAwsWaitTimeExp(consecutiveErrorAttempts) continue } else { logger.Fatalf("GetRecords ERROR: %v\n", err) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 09caa70..f0a9bed 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -33,9 +33,20 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { logger.Fatalf("sql.Open ERROR: %v\n", err) } - _, err = db.Exec(e.copyStatement(s3File)) + for i := 0; i < 10; i++ { + // handle aws backoff, this may be necessary if, for example, the + // s3 file has not appeared to the database yet + handleAwsWaitTimeExp(i) + + // load S3File into database + _, err = db.Exec(e.copyStatement(s3File)) + + // if the request succeeded, or its an unrecoverable error, break out of + // the loop because we are done + if err == nil || isRecoverableError(err) == false { + break + } - if err != nil { logger.Fatalf("db.Exec ERROR: %v\n", err) }