move aws backoff to its own file, and use it for redshift basic emitter
This commit is contained in:
parent
2a285c52d5
commit
e42dd7f73d
3 changed files with 128 additions and 79 deletions
104
awsbackoff.go
Normal file
104
awsbackoff.go
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"reflect"
|
||||||
|
"regexp"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ezoic/go-kinesis"
|
||||||
|
l4g "github.com/ezoic/log4go"
|
||||||
|
"github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
type isRecoverableErrorFunc func(error) bool
|
||||||
|
|
||||||
|
func kinesisIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrorCodes := map[string]bool{
|
||||||
|
"ProvisionedThroughputExceededException": true,
|
||||||
|
"InternalFailure": true,
|
||||||
|
"Throttling": true,
|
||||||
|
"ServiceUnavailable": true,
|
||||||
|
//"ExpiredIteratorException": true,
|
||||||
|
}
|
||||||
|
r := false
|
||||||
|
cErr, ok := err.(*kinesis.Error)
|
||||||
|
if ok && recoverableErrorCodes[cErr.Code] == true {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func urlIsRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
_, ok := err.(*url.Error)
|
||||||
|
if ok {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func netIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrors := map[string]bool{
|
||||||
|
"connection reset by peer": true,
|
||||||
|
}
|
||||||
|
r := false
|
||||||
|
cErr, ok := err.(*net.OpError)
|
||||||
|
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
var redshiftRecoverableErrors = []*regexp.Regexp{
|
||||||
|
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
|
||||||
|
}
|
||||||
|
|
||||||
|
func redshiftIsRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
if cErr, ok := err.(pq.Error); ok {
|
||||||
|
for _, re := range redshiftRecoverableErrors {
|
||||||
|
if re.MatchString(cErr.Message) {
|
||||||
|
r = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
var isRecoverableErrors = []isRecoverableErrorFunc{
|
||||||
|
kinesisIsRecoverableError, netIsRecoverableError, urlIsRecoverableError, redshiftIsRecoverableError,
|
||||||
|
}
|
||||||
|
|
||||||
|
// this determines whether the error is recoverable
|
||||||
|
func isRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
|
||||||
|
log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
||||||
|
|
||||||
|
for _, errF := range isRecoverableErrors {
|
||||||
|
r = errF(err)
|
||||||
|
if r {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle the aws exponential backoff
|
||||||
|
func handleAwsWaitTimeExp(attempts int) {
|
||||||
|
|
||||||
|
//http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||||
|
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||||
|
if attempts > 0 {
|
||||||
|
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
||||||
|
l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String())
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
80
pipeline.go
80
pipeline.go
|
|
@ -2,10 +2,6 @@ package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ezoic/go-kinesis"
|
"github.com/ezoic/go-kinesis"
|
||||||
|
|
@ -27,78 +23,6 @@ type Pipeline struct {
|
||||||
CheckpointFilteredRecords bool
|
CheckpointFilteredRecords bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type pipelineIsRecoverableErrorFunc func(error) bool
|
|
||||||
|
|
||||||
func pipelineKinesisIsRecoverableError(err error) bool {
|
|
||||||
recoverableErrorCodes := map[string]bool{
|
|
||||||
"ProvisionedThroughputExceededException": true,
|
|
||||||
"InternalFailure": true,
|
|
||||||
"Throttling": true,
|
|
||||||
"ServiceUnavailable": true,
|
|
||||||
//"ExpiredIteratorException": true,
|
|
||||||
}
|
|
||||||
r := false
|
|
||||||
cErr, ok := err.(*kinesis.Error)
|
|
||||||
if ok && recoverableErrorCodes[cErr.Code] == true {
|
|
||||||
r = true
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func pipelineUrlIsRecoverableError(err error) bool {
|
|
||||||
r := false
|
|
||||||
_, ok := err.(*url.Error)
|
|
||||||
if ok {
|
|
||||||
r = true
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func pipelineNetIsRecoverableError(err error) bool {
|
|
||||||
recoverableErrors := map[string]bool{
|
|
||||||
"connection reset by peer": true,
|
|
||||||
}
|
|
||||||
r := false
|
|
||||||
cErr, ok := err.(*net.OpError)
|
|
||||||
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
|
||||||
r = true
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{
|
|
||||||
pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError,
|
|
||||||
}
|
|
||||||
|
|
||||||
// this determines whether the error is recoverable
|
|
||||||
func (p Pipeline) isRecoverableError(err error) bool {
|
|
||||||
r := false
|
|
||||||
|
|
||||||
log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
|
||||||
|
|
||||||
for _, errF := range pipelineIsRecoverableErrors {
|
|
||||||
r = errF(err)
|
|
||||||
if r {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle the aws exponential backoff
|
|
||||||
func (p Pipeline) handleAwsWaitTimeExp(attempts int) {
|
|
||||||
|
|
||||||
//http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
|
||||||
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
|
||||||
if attempts > 0 {
|
|
||||||
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
|
|
||||||
l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String())
|
|
||||||
time.Sleep(waitTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessShard kicks off the process of a Kinesis Shard.
|
// ProcessShard kicks off the process of a Kinesis Shard.
|
||||||
// It is a long running process that will continue to read from the shard.
|
// It is a long running process that will continue to read from the shard.
|
||||||
func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
@ -130,14 +54,14 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle the aws backoff stuff
|
// handle the aws backoff stuff
|
||||||
p.handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||||
|
|
||||||
args = kinesis.NewArgs()
|
args = kinesis.NewArgs()
|
||||||
args.Add("ShardIterator", shardIterator)
|
args.Add("ShardIterator", shardIterator)
|
||||||
recordSet, err := ksis.GetRecords(args)
|
recordSet, err := ksis.GetRecords(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if p.isRecoverableError(err) {
|
if isRecoverableError(err) {
|
||||||
l4g.Info("recoverable error, %s", err)
|
l4g.Info("recoverable error, %s", err)
|
||||||
consecutiveErrorAttempts++
|
consecutiveErrorAttempts++
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,28 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) {
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
|
|
||||||
_, err := e.Db.Exec(e.copyStatement(s3File))
|
stmt := e.copyStatement(s3File)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
|
||||||
|
// handle aws backoff, this may be necessary if, for example, the
|
||||||
|
// s3 file has not appeared to the database yet
|
||||||
|
handleAwsWaitTimeExp(i)
|
||||||
|
|
||||||
|
// load into the database
|
||||||
|
_, err := e.Db.Exec(stmt)
|
||||||
|
|
||||||
|
// if the request succeeded, or its an unrecoverable error, break out of the loop
|
||||||
|
// because we are done
|
||||||
|
if err == nil || isRecoverableError(err) == false {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// recoverable error, lets warn
|
||||||
|
l4g.Warn(err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue