Add Retries to Redshift Basic Emitter
* Move AWS exponential backoff to its own file
This commit is contained in:
parent
f4de27dc13
commit
8c660f79fb
3 changed files with 117 additions and 23 deletions
98
awsbackoff.go
Normal file
98
awsbackoff.go
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
package connector
|
||||
|
||||
import (
|
||||
"math"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/sendgridlabs/go-kinesis"
|
||||
)
|
||||
|
||||
type isRecoverableErrorFunc func(error) bool
|
||||
|
||||
func kinesisIsRecoverableError(err error) bool {
|
||||
recoverableErrorCodes := map[string]bool{
|
||||
"ProvisionedThroughputExceededException": true,
|
||||
"InternalFailure": true,
|
||||
"Throttling": true,
|
||||
"ServiceUnavailable": true,
|
||||
}
|
||||
r := false
|
||||
cErr, ok := err.(*kinesis.Error)
|
||||
if ok && recoverableErrorCodes[cErr.Code] == true {
|
||||
r = true
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func urlIsRecoverableError(err error) bool {
|
||||
r := false
|
||||
_, ok := err.(*url.Error)
|
||||
if ok {
|
||||
r = true
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func netIsRecoverableError(err error) bool {
|
||||
recoverableErrors := map[string]bool{
|
||||
"connection reset by peer": true,
|
||||
}
|
||||
cErr, ok := err.(*net.OpError)
|
||||
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var redshiftRecoverableErrors = []*regexp.Regexp{
|
||||
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
|
||||
}
|
||||
|
||||
func redshiftIsRecoverableError(err error) bool {
|
||||
r := false
|
||||
if cErr, ok := err.(pq.Error); ok {
|
||||
for _, re := range redshiftRecoverableErrors {
|
||||
if re.MatchString(cErr.Message) {
|
||||
r = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
var isRecoverableErrors = []isRecoverableErrorFunc{
|
||||
kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError,
|
||||
}
|
||||
|
||||
// this determines whether the error is recoverable
|
||||
func isRecoverableError(err error) bool {
|
||||
r := false
|
||||
|
||||
logger.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
||||
|
||||
for _, errF := range isRecoverableErrors {
|
||||
r = errF(err)
|
||||
if r {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// handle the aws exponential backoff
|
||||
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||
func handleAwsWaitTimeExp(attempts int) {
|
||||
if attempts > 0 {
|
||||
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||
logger.Printf("handleAwsWaitTimeExp: %s\n", waitTime.String())
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
}
|
||||
27
pipeline.go
27
pipeline.go
|
|
@ -1,7 +1,6 @@
|
|||
package connector
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/sendgridlabs/go-kinesis"
|
||||
|
|
@ -21,24 +20,6 @@ type Pipeline struct {
|
|||
Transformer Transformer
|
||||
}
|
||||
|
||||
// determine whether the error is recoverable
|
||||
func (p Pipeline) isRecoverableError(err error) bool {
|
||||
cErr, ok := err.(*kinesis.Error)
|
||||
if ok && cErr.Code == "ProvisionedThroughputExceeded" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// handle the aws exponential backoff
|
||||
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||
func (p Pipeline) handleAwsWaitTimeExp(attempts int) {
|
||||
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||
logger.Printf("waitingnow")
|
||||
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
|
||||
// ProcessShard kicks off the process of a Kinesis Shard.
|
||||
// It is a long running process that will continue to read from the shard.
|
||||
func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||
|
|
@ -63,15 +44,19 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
|||
consecutiveErrorAttempts := 0
|
||||
|
||||
for {
|
||||
if consecutiveErrorAttempts > 50 {
|
||||
logger.Fatalf("Too many consecutive error attempts")
|
||||
}
|
||||
|
||||
args = kinesis.NewArgs()
|
||||
args.Add("ShardIterator", shardIterator)
|
||||
recordSet, err := ksis.GetRecords(args)
|
||||
|
||||
if err != nil {
|
||||
if p.isRecoverableError(err) {
|
||||
if isRecoverableError(err) {
|
||||
logger.Printf("GetRecords RECOVERABLE_ERROR: %v\n", err)
|
||||
consecutiveErrorAttempts++
|
||||
p.handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||
handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||
continue
|
||||
} else {
|
||||
logger.Fatalf("GetRecords ERROR: %v\n", err)
|
||||
|
|
|
|||
|
|
@ -33,9 +33,20 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
|||
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(e.copyStatement(s3File))
|
||||
for i := 0; i < 10; i++ {
|
||||
// handle aws backoff, this may be necessary if, for example, the
|
||||
// s3 file has not appeared to the database yet
|
||||
handleAwsWaitTimeExp(i)
|
||||
|
||||
// load S3File into database
|
||||
_, err = db.Exec(e.copyStatement(s3File))
|
||||
|
||||
// if the request succeeded, or its an unrecoverable error, break out of
|
||||
// the loop because we are done
|
||||
if err == nil || isRecoverableError(err) == false {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue