From 969ba1882443dad8b3b19a5dea223284d7190d90 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 13:42:50 -0700 Subject: [PATCH] if there are no records available, keep going until the nextiterator returns nothing --- pipeline.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pipeline.go b/pipeline.go index f2feeb4..8413125 100644 --- a/pipeline.go +++ b/pipeline.go @@ -33,7 +33,7 @@ var pipelineRecoverableErrorCodes = map[string]bool{ func (p Pipeline) isRecoverableError(err error) bool { r := false - l4g.Debug("isRecoverableError, type %s, value (+%v)", reflect.TypeOf(err).Name(), err) + log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) cErr, ok := err.(*kinesis.Error) if ok && pipelineRecoverableErrorCodes[cErr.Code] == true { @@ -48,8 +48,11 @@ func (p Pipeline) handleAwsWaitTimeExp(attempts int) { //http://docs.aws.amazon.com/general/latest/gr/api-retries.html // wait up to 5 minutes based on the aws exponential backoff algorithm - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - time.Sleep(waitTime) + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } } @@ -119,7 +122,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { l4g.Error("NextShardIterator ERROR: %v", err) break } else { - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() {