handle more errors
This commit is contained in:
parent
46e5d62884
commit
e5af96fb54
1 changed files with 16 additions and 2 deletions
18
pipeline.go
18
pipeline.go
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -44,6 +45,15 @@ func pipelineKinesisIsRecoverableError(err error) bool {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pipelineUrlIsRecoverableError(err error) bool {
|
||||||
|
r := false
|
||||||
|
_, ok := err.(*url.Error)
|
||||||
|
if ok {
|
||||||
|
r = true
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func pipelineNetIsRecoverableError(err error) bool {
|
func pipelineNetIsRecoverableError(err error) bool {
|
||||||
recoverableErrors := map[string]bool{
|
recoverableErrors := map[string]bool{
|
||||||
"connection reset by peer": true,
|
"connection reset by peer": true,
|
||||||
|
|
@ -57,14 +67,14 @@ func pipelineNetIsRecoverableError(err error) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{
|
var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{
|
||||||
pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError,
|
pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError,
|
||||||
}
|
}
|
||||||
|
|
||||||
// this determines whether the error is recoverable
|
// this determines whether the error is recoverable
|
||||||
func (p Pipeline) isRecoverableError(err error) bool {
|
func (p Pipeline) isRecoverableError(err error) bool {
|
||||||
r := false
|
r := false
|
||||||
|
|
||||||
log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err)
|
log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err)
|
||||||
|
|
||||||
for _, errF := range pipelineIsRecoverableErrors {
|
for _, errF := range pipelineIsRecoverableErrors {
|
||||||
r = errF(err)
|
r = errF(err)
|
||||||
|
|
@ -115,6 +125,10 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
if consecutiveErrorAttempts > 50 {
|
||||||
|
log.Fatalln("Too many consecutive error attempts")
|
||||||
|
}
|
||||||
|
|
||||||
// handle the aws backoff stuff
|
// handle the aws backoff stuff
|
||||||
p.handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
p.handleAwsWaitTimeExp(consecutiveErrorAttempts)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue