diff --git a/pipeline.go b/pipeline.go index b3cc44b..dd299ec 100644 --- a/pipeline.go +++ b/pipeline.go @@ -31,6 +31,10 @@ type pipelineIsRecoverableErrorFunc func(error) bool func pipelineKinesisIsRecoverableError(err error) bool { recoverableErrorCodes := map[string]bool{ "ProvisionedThroughputExceededException": true, + "InternalFailure": true, + "Throttling": true, + "ServiceUnavailable": true, + //"ExpiredIteratorException": true, } r := false cErr, ok := err.(*kinesis.Error) @@ -151,7 +155,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { l4g.Error("NextShardIterator ERROR: %v", err) break } else { - //time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() { diff --git a/pipeline_test.go b/pipeline_test.go new file mode 100644 index 0000000..3629f05 --- /dev/null +++ b/pipeline_test.go @@ -0,0 +1,41 @@ +package connector + +import ( + "fmt" + "net" + "testing" + + "github.com/ezoic/go-kinesis" +) + +func Test_isRecoverableError(t *testing.T) { + + testCases := []struct { + err error + isRecoverable bool + }{ + {err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true}, + {err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false}, + {err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true}, + {err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false}, + {err: fmt.Errorf("an arbitrary error"), isRecoverable: false}, + + //"InternalFailure": true, + //"Throttling": true, + //"ServiceUnavailable": true, + ////"ExpiredIteratorException": true, + //{err: *kinesis.Error{Code:""}} + } + + for idx, tc := range testCases { + + p := Pipeline{} + isRecoverable := p.isRecoverableError(tc.err) + if isRecoverable != tc.isRecoverable { + t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err) + } + + } +}