diff --git a/pipeline.go b/pipeline.go index f2feeb4..8413125 100644 --- a/pipeline.go +++ b/pipeline.go @@ -33,7 +33,7 @@ var pipelineRecoverableErrorCodes = map[string]bool{ func (p Pipeline) isRecoverableError(err error) bool { r := false - l4g.Debug("isRecoverableError, type %s, value (+%v)", reflect.TypeOf(err).Name(), err) + log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err) cErr, ok := err.(*kinesis.Error) if ok && pipelineRecoverableErrorCodes[cErr.Code] == true { @@ -48,8 +48,11 @@ func (p Pipeline) handleAwsWaitTimeExp(attempts int) { //http://docs.aws.amazon.com/general/latest/gr/api-retries.html // wait up to 5 minutes based on the aws exponential backoff algorithm - waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond - time.Sleep(waitTime) + if attempts > 0 { + waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond + l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String()) + time.Sleep(waitTime) + } } @@ -119,7 +122,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { l4g.Error("NextShardIterator ERROR: %v", err) break } else { - time.Sleep(5 * time.Second) + //time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() {