diff --git a/kinesis.go b/kinesis.go index 77f0644..1d66ca7 100644 --- a/kinesis.go +++ b/kinesis.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" ) // CreateStream creates a new Kinesis stream (uses existing stream if exists) and diff --git a/pipeline.go b/pipeline.go index 14d4198..f6b6fb2 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,9 +2,12 @@ package connector import ( "log" + "math" + "os" "time" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/sol/log4go" ) // Pipeline is used as a record processor to configure a pipline. @@ -21,6 +24,28 @@ type Pipeline struct { Transformer Transformer } +// this determines whether the error is recoverable +func (p Pipeline) isRecoverableError(err error) bool { + r := false + + cErr, ok := err.(*kinesis.Error) + if ok && cErr.Code == "ProvisionedThroughputExceeded" { + r = true + } + + return true +} + +// handle the aws exponential backoff +func (p Pipeline) handleAwsWaitTimeExp(attempts int) { + + //http://docs.aws.amazon.com/general/latest/gr/api-retries.html + // wait up to 5 minutes based on the aws exponential backoff algorithm + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + time.Sleep(waitTime) + +} + // ProcessShard kicks off the process of a Kinesis Shard. // It is a long running process that will continue to read from the shard. func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { @@ -43,13 +68,26 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardIterator := shardInfo.ShardIterator + consecutiveErrorAttempts := 0 + for { + + // handle the aws backoff stuff + p.handleAwsWaitTimeExp(consecutiveErrorAttempts) + args = kinesis.NewArgs() args.Add("ShardIterator", shardIterator) recordSet, err := ksis.GetRecords(args) if err != nil { - log.Fatalf("GetRecords ERROR: %v\n", err) + if p.isRecoverableError(err) { + consecutiveErrorAttempts++ + } else { + l4g.Critical("GetRecords ERROR: %v\n", err) + os.Exit(1) + } + } else { + consecutiveErrorAttempts = 0 } if len(recordSet.Records) > 0 { @@ -57,7 +95,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - log.Printf("GetData ERROR: %v\n", err) + l4g.Error("GetData ERROR: %v\n", err) continue } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 756d760..8918da8 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -3,7 +3,7 @@ package connector import ( "log" - "github.com/sendgridlabs/go-kinesis" + "github.com/ezoic/go-kinesis" ) // An implementation of Emitter that puts event data on S3 file, and then puts the