From e5af96fb54c1e3179216225e5315e9efb09c74b9 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 21 Apr 2015 15:14:24 -0700 Subject: [PATCH] handle more errors --- pipeline.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pipeline.go b/pipeline.go index dd299ec..3bbb73d 100644 --- a/pipeline.go +++ b/pipeline.go @@ -4,6 +4,7 @@ import ( "log" "math" "net" + "net/url" "reflect" "time" @@ -44,6 +45,15 @@ func pipelineKinesisIsRecoverableError(err error) bool { return r } +func pipelineUrlIsRecoverableError(err error) bool { + r := false + _, ok := err.(*url.Error) + if ok { + r = true + } + return r +} + func pipelineNetIsRecoverableError(err error) bool { recoverableErrors := map[string]bool{ "connection reset by peer": true, @@ -57,14 +67,14 @@ func pipelineNetIsRecoverableError(err error) bool { } var pipelineIsRecoverableErrors = []pipelineIsRecoverableErrorFunc{ - pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, + pipelineKinesisIsRecoverableError, pipelineNetIsRecoverableError, pipelineUrlIsRecoverableError, } // this determines whether the error is recoverable func (p Pipeline) isRecoverableError(err error) bool { r := false - log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) + log.Printf("isRecoverableError, type %s, value (%#v)\n", reflect.TypeOf(err).String(), err) for _, errF := range pipelineIsRecoverableErrors { r = errF(err) @@ -115,6 +125,10 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { for { + if consecutiveErrorAttempts > 50 { + log.Fatalln("Too many consecutive error attempts") + } + // handle the aws backoff stuff p.handleAwsWaitTimeExp(consecutiveErrorAttempts)