if there are no records available, keep going until the nextiterator returns nothing

This commit is contained in:
dan 2015-04-07 13:42:50 -07:00
parent 2ff748a2d4
commit 969ba18824

View file

@ -33,7 +33,7 @@ var pipelineRecoverableErrorCodes = map[string]bool{
func (p Pipeline) isRecoverableError(err error) bool {
r := false
l4g.Debug("isRecoverableError, type %s, value (+%v)", reflect.TypeOf(err).Name(), err)
log.Printf("isRecoverableError, type %s, value (+%v)\n", reflect.TypeOf(err).String(), err)
cErr, ok := err.(*kinesis.Error)
if ok && pipelineRecoverableErrorCodes[cErr.Code] == true {
@ -48,8 +48,11 @@ func (p Pipeline) handleAwsWaitTimeExp(attempts int) {
//http://docs.aws.amazon.com/general/latest/gr/api-retries.html
// wait up to 5 minutes based on the aws exponential backoff algorithm
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
time.Sleep(waitTime)
if attempts > 0 {
waitTime := time.Duration(math.Min(100*math.Pow(2, float64(attempts)), 300000)) * time.Millisecond
l4g.Finest("handleAwsWaitTimeExp:%s", waitTime.String())
time.Sleep(waitTime)
}
}
@ -119,7 +122,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
l4g.Error("NextShardIterator ERROR: %v", err)
break
} else {
time.Sleep(5 * time.Second)
//time.Sleep(5 * time.Second)
}
if p.Buffer.ShouldFlush() {