handle more kinesis recoverable errors
This commit is contained in:
parent
a028ee862f
commit
46e5d62884
2 changed files with 46 additions and 1 deletions
|
|
@ -31,6 +31,10 @@ type pipelineIsRecoverableErrorFunc func(error) bool
|
||||||
func pipelineKinesisIsRecoverableError(err error) bool {
|
func pipelineKinesisIsRecoverableError(err error) bool {
|
||||||
recoverableErrorCodes := map[string]bool{
|
recoverableErrorCodes := map[string]bool{
|
||||||
"ProvisionedThroughputExceededException": true,
|
"ProvisionedThroughputExceededException": true,
|
||||||
|
"InternalFailure": true,
|
||||||
|
"Throttling": true,
|
||||||
|
"ServiceUnavailable": true,
|
||||||
|
//"ExpiredIteratorException": true,
|
||||||
}
|
}
|
||||||
r := false
|
r := false
|
||||||
cErr, ok := err.(*kinesis.Error)
|
cErr, ok := err.(*kinesis.Error)
|
||||||
|
|
@ -151,7 +155,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
l4g.Error("NextShardIterator ERROR: %v", err)
|
l4g.Error("NextShardIterator ERROR: %v", err)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
//time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
if p.Buffer.ShouldFlush() {
|
||||||
|
|
|
||||||
41
pipeline_test.go
Normal file
41
pipeline_test.go
Normal file
|
|
@ -0,0 +1,41 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ezoic/go-kinesis"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_isRecoverableError(t *testing.T) {
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
err error
|
||||||
|
isRecoverable bool
|
||||||
|
}{
|
||||||
|
{err: &kinesis.Error{Code: "ProvisionedThroughputExceededException"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "Throttling"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "ServiceUnavailable"}, isRecoverable: true},
|
||||||
|
{err: &kinesis.Error{Code: "ExpiredIteratorException"}, isRecoverable: false},
|
||||||
|
{err: &net.OpError{Err: fmt.Errorf("connection reset by peer")}, isRecoverable: true},
|
||||||
|
{err: &net.OpError{Err: fmt.Errorf("unexpected error")}, isRecoverable: false},
|
||||||
|
{err: fmt.Errorf("an arbitrary error"), isRecoverable: false},
|
||||||
|
|
||||||
|
//"InternalFailure": true,
|
||||||
|
//"Throttling": true,
|
||||||
|
//"ServiceUnavailable": true,
|
||||||
|
////"ExpiredIteratorException": true,
|
||||||
|
//{err: *kinesis.Error{Code:""}}
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx, tc := range testCases {
|
||||||
|
|
||||||
|
p := Pipeline{}
|
||||||
|
isRecoverable := p.isRecoverableError(tc.err)
|
||||||
|
if isRecoverable != tc.isRecoverable {
|
||||||
|
t.Errorf("test case %d: isRecoverable expected %t, actual %t, for error %+v", idx, tc.isRecoverable, isRecoverable, tc.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue