diff --git a/pipeline.go b/pipeline.go index 7156065..04ba3f9 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,6 +1,7 @@ package connector import ( + "math" "time" "github.com/sendgridlabs/go-kinesis" @@ -20,6 +21,24 @@ type Pipeline struct { Transformer Transformer } +// determine whether the error is recoverable +func (p Pipeline) isRecoverableError(err error) bool { + cErr, ok := err.(*kinesis.Error) + if ok && cErr.Code == "ProvisionedThroughputExceeded" { + return true + } + return false +} + +// handle the aws exponential backoff +// http://docs.aws.amazon.com/general/latest/gr/api-retries.html +func (p Pipeline) handleAwsWaitTimeExp(attempts int) { + // wait up to 5 minutes based on the aws exponential backoff algorithm + logger.Printf("waitingnow") + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + time.Sleep(waitTime) +} + // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -41,6 +60,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } shardIterator := shardInfo.ShardIterator + consecutiveErrorAttempts := 0 for { args = kinesis.NewArgs() @@ -48,7 +68,16 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { recordSet, err := ksis.GetRecords(args) if err != nil { - logger.Fatalf("GetRecords ERROR: %v\n", err) + if p.isRecoverableError(err) { + logger.Printf("GetRecords RECOVERABLE_ERROR: %v\n", err) + consecutiveErrorAttempts++ + p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + continue + } else { + logger.Fatalf("GetRecords ERROR: %v\n", err) + } + } else { + consecutiveErrorAttempts = 0 } if len(recordSet.Records) > 0 {