diff --git a/awsbackoff.go b/awsbackoff.go new file mode 100644 index 0000000..31fe87b --- /dev/null +++ b/awsbackoff.go @@ -0,0 +1,104 @@ +package connector + +import ( + "log" + "math" + "net" + "net/url" + "reflect" + "regexp" + "time" + + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/log4go" + "github.com/lib/pq" +) + +type isRecoverableErrorFunc func(error) bool + +func kinesisIsRecoverableError(err error) bool { + recoverableErrorCodes := map[string]bool{ + "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + //"ExpiredIteratorException": true, + } + r := false + cErr, ok := err.(*kinesis.Error) + if ok && recoverableErrorCodes[cErr.Code] == true { + r = true + } + return r +} + +func urlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + +func netIsRecoverableError(err error) bool { + recoverableErrors := map[string]bool{ + "connection reset by peer": true, + } + r := false + cErr, ok := err.(*net.OpError) + if ok && recoverableErrors[cErr.Err.Error()] == true { + r = true + } + return r +} + +var redshiftRecoverableErrors = []*regexp.Regexp{ + regexp.MustCompile("The specified S3 prefix '.*?' does not exist"), +} + +func redshiftIsRecoverableError(err error) bool { + r := false + if cErr, ok := err.(pq.Error); ok { + for _, re := range redshiftRecoverableErrors { + if re.MatchString(cErr.Message) { + r = true + break + } + } + } + return r +} + +var isRecoverableErrors = []isRecoverableErrorFunc{ + kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError, +} + +// this determines whether the error is recoverable +func isRecoverableError(err error) bool { + r := false + + log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) + + for _, errF := range isRecoverableErrors { + r = errF(err) + if r { + break + } + } + + return r +} + +// handle the aws exponential backoff +func handleAwsWaitTimeExp(attempts int) { + + //http://docs.aws.amazon.com/general/latest/gr/api-retries.html + // wait up to 5 minutes based on the aws exponential backoff algorithm + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } + +} diff --git a/pipeline.go b/pipeline.go index 3bbb73d..7ff7199 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,10 +2,6 @@ package connector import ( "log" - "math" - "net" - "net/url" - "reflect" "time" "github.com/ezoic/go-kinesis" @@ -27,78 +23,6 @@ type Pipeline struct { CheckpointFilteredRecords bool } -type pipelineIsRecoverableErrorFunc func(error) bool - -func pipelineKinesisIsRecoverableError(err error) bool { - recoverableErrorCodes := map[string]bool{ - "ProvisionedThroughputExceededException": true, - "InternalFailure": true, - "Throttling": true, - "ServiceUnavailable": true, - //"ExpiredIteratorException": true, - } - r := false - cErr, ok := err.(*kinesis.Error) - if ok && recoverableErrorCodes[cErr.Code] == true { - r = true - } - return r -} - -func pipelineUrlIsRecoverableError(err error) bool { - r := false - _, ok := err.(*url.Error) - if ok { - r = true - } - return r -} - -func pipelineNetIsRecoverableError(err error) bool { - recoverableErrors := map[string]bool{ - "connection reset by peer": true, - } - r := false - cErr, ok := err.(*net.OpError) - if ok && recoverableErrors[cErr.Err.Error()] == true { - r = true - } - return r -} - -var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{ - pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError, -} - -// this determines whether the error is recoverable -func (p Pipeline) isRecoverableError(err error) bool { - r := false - - log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) - - for _, errF := range pipelineIsRecoverableErrors { - r = errF(err) - if r { - break - } - } - - return r -} - -// handle the aws exponential backoff -func (p Pipeline) handleAwsWaitTimeExp(attempts int) { - - //http://docs.aws.amazon.com/general/latest/gr/api-retries.html - // wait up to 5 minutes based on the aws exponential backoff algorithm - if attempts > 0 { - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) - time.Sleep(waitTime) - } - -} - // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -130,14 +54,14 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } // handle the aws backoff stuff - p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + handleAwsWaitTimeExp(consecutiveErrorAttempts) args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - if p.isRecoverableError(err) { + if isRecoverableError(err) { l4g.Info("recoverable error, %s", err) consecutiveErrorAttempts++ continue diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 6c54dde..3544076 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -32,7 +32,28 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - _, err := e.Db.Exec(e.copyStatement(s3File)) + stmt := e.copyStatement(s3File) + + var err error + for i := 0; i < 10; i++ { + + // handle aws backoff, this may be necessary if, for example, the + // s3 file has not appeared to the database yet + handleAwsWaitTimeExp(i) + + // load into the database + _, err := e.Db.Exec(stmt) + + // if the request succeeded, or its an unrecoverable error, break out of the loop + // because we are done + if err == nil || isRecoverableError(err) == false { + break + } + + // recoverable error, lets warn + l4g.Warn(err) + + } if err != nil { log.Fatal(err)