Fix recoverable error tests
This commit is contained in:
parent
7fccc9da9f
commit
8f1d408c52
3 changed files with 71 additions and 87 deletions
|
|
@ -2,36 +2,11 @@ package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"regexp"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
||||||
"github.com/lib/pq"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type isRecoverableErrorFunc func(error) bool
|
// AWS Exponential Backoff
|
||||||
|
// Wait up to 5 minutes based on the aws exponential backoff algorithm
|
||||||
var isRecoverableErrors = []isRecoverableErrorFunc{
|
|
||||||
kinesisIsRecoverableError,
|
|
||||||
netIsRecoverableError,
|
|
||||||
redshiftIsRecoverableError,
|
|
||||||
urlIsRecoverableError,
|
|
||||||
}
|
|
||||||
|
|
||||||
// isRecoverableError determines whether the error is recoverable
|
|
||||||
func isRecoverableError(err error) bool {
|
|
||||||
for _, errF := range isRecoverableErrors {
|
|
||||||
if errF(err) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle the aws exponential backoff
|
|
||||||
// wait up to 5 minutes based on the aws exponential backoff algorithm
|
|
||||||
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||||
func handleAwsWaitTimeExp(attempts int) {
|
func handleAwsWaitTimeExp(attempts int) {
|
||||||
if attempts > 0 {
|
if attempts > 0 {
|
||||||
|
|
@ -39,55 +14,3 @@ func handleAwsWaitTimeExp(attempts int) {
|
||||||
time.Sleep(waitTime)
|
time.Sleep(waitTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func kinesisIsRecoverableError(err error) bool {
|
|
||||||
recoverableErrorCodes := map[string]bool{
|
|
||||||
"InternalFailure": true,
|
|
||||||
"ProvisionedThroughputExceededException": true,
|
|
||||||
"RequestError": true,
|
|
||||||
"ServiceUnavailable": true,
|
|
||||||
"Throttling": true,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err, ok := err.(awserr.Error); ok {
|
|
||||||
if ok && recoverableErrorCodes[err.Code()] == true {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func urlIsRecoverableError(err error) bool {
|
|
||||||
_, ok := err.(*url.Error)
|
|
||||||
if ok {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func netIsRecoverableError(err error) bool {
|
|
||||||
recoverableErrors := map[string]bool{
|
|
||||||
"connection reset by peer": true,
|
|
||||||
}
|
|
||||||
cErr, ok := err.(*net.OpError)
|
|
||||||
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func redshiftIsRecoverableError(err error) bool {
|
|
||||||
redshiftRecoverableErrors := []*regexp.Regexp{
|
|
||||||
regexp.MustCompile("The specified S3 prefix '.*?' does not exist"),
|
|
||||||
}
|
|
||||||
|
|
||||||
if cErr, ok := err.(pq.Error); ok {
|
|
||||||
for _, re := range redshiftRecoverableErrors {
|
|
||||||
if re.MatchString(cErr.Message) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
||||||
64
errors.go
Normal file
64
errors.go
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
)
|
||||||
|
|
||||||
|
type isRecoverableErrorFunc func(error) bool
|
||||||
|
|
||||||
|
var isRecoverableErrors = []isRecoverableErrorFunc{
|
||||||
|
kinesisIsRecoverableError,
|
||||||
|
netIsRecoverableError,
|
||||||
|
urlIsRecoverableError,
|
||||||
|
}
|
||||||
|
|
||||||
|
// isRecoverableError determines whether the error is recoverable
|
||||||
|
func isRecoverableError(err error) bool {
|
||||||
|
for _, errF := range isRecoverableErrors {
|
||||||
|
if errF(err) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func kinesisIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrorCodes := map[string]bool{
|
||||||
|
"InternalFailure": true,
|
||||||
|
"ProvisionedThroughputExceededException": true,
|
||||||
|
"RequestError": true,
|
||||||
|
"ServiceUnavailable": true,
|
||||||
|
"Throttling": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err, ok := err.(awserr.Error); ok {
|
||||||
|
if ok && recoverableErrorCodes[err.Code()] == true {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func urlIsRecoverableError(err error) bool {
|
||||||
|
_, ok := err.(*url.Error)
|
||||||
|
if ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func netIsRecoverableError(err error) bool {
|
||||||
|
recoverableErrors := map[string]bool{
|
||||||
|
"connection reset by peer": true,
|
||||||
|
}
|
||||||
|
cErr, ok := err.(*net.OpError)
|
||||||
|
if ok && recoverableErrors[cErr.Err.Error()] == true {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
@ -5,9 +5,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
"github.com/bmizerany/assert"
|
"github.com/bmizerany/assert"
|
||||||
"github.com/lib/pq"
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_isRecoverableError(t *testing.T) {
|
func Test_isRecoverableError(t *testing.T) {
|
||||||
|
|
@ -15,15 +14,13 @@ func Test_isRecoverableError(t *testing.T) {
|
||||||
err error
|
err error
|
||||||
isRecoverable bool
|
isRecoverable bool
|
||||||
}{
|
}{
|
||||||
{err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true},
|
{err: awserr.New("ProvisionedThroughputExceededException", "", nil), isRecoverable: true},
|
||||||
{err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true},
|
{err: awserr.New("Throttling", "", nil), isRecoverable: true},
|
||||||
{err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true},
|
{err: awserr.New("ServiceUnavailable", "", nil), isRecoverable: true},
|
||||||
{err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false},
|
{err: awserr.New("ExpiredIteratorException", "", nil), isRecoverable: false},
|
||||||
{err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true},
|
{err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true},
|
||||||
{err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false},
|
{err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false},
|
||||||
{err: fmt.Errorf("an arbitrary error"), isRecoverable: false},
|
{err: fmt.Errorf("an arbitrary error"), isRecoverable: false},
|
||||||
{err: pq.Error{Message: "The specified S3 prefix 'somefilethatismissing' does not exist"}, isRecoverable: true},
|
|
||||||
{err: pq.Error{Message: "Some other pq error"}, isRecoverable: false},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
Loading…
Reference in a new issue