Fix the exponential backoff

Fix the calculation of exponential backoff. ^ is the XOR in
golang. Replaced it with math.exp2().
This commit is contained in:
Tao Jiang 2018-10-30 12:41:45 -07:00
parent 2d5d506659
commit 6a1a7b7da6
2 changed files with 11 additions and 5 deletions

View file

@ -29,6 +29,7 @@ package worker
import ( import (
"errors" "errors"
"math"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -300,7 +301,7 @@ func (checkpointer *DynamoCheckpoint) putItem(input *dynamodb.PutItemInput) erro
awsErr.Code() == dynamodb.ErrCodeInternalServerError && awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries { attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err return true, err
} }
} }
@ -325,7 +326,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam
awsErr.Code() == dynamodb.ErrCodeInternalServerError && awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries { attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err return true, err
} }
} }
@ -350,7 +351,7 @@ func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error {
awsErr.Code() == dynamodb.ErrCodeInternalServerError && awsErr.Code() == dynamodb.ErrCodeInternalServerError &&
attempt < checkpointer.Retries { attempt < checkpointer.Retries {
// Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html
time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond)
return true, err return true, err
} }
} }

View file

@ -29,6 +29,7 @@ package worker
import ( import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"math"
"sync" "sync"
"time" "time"
@ -149,9 +150,9 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
sc.recordProcessor.Initialize(input) sc.recordProcessor.Initialize(input)
recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer)
retriedErrors := 0
for { for {
retriedErrors := 0
getRecordsStartTime := time.Now() getRecordsStartTime := time.Now()
if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID)
@ -181,7 +182,8 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
log.Errorf("Error getting records from shard %v: %+v", shard.ID, err) log.Errorf("Error getting records from shard %v: %+v", shard.ID, err)
retriedErrors++ retriedErrors++
// exponential backoff // exponential backoff
time.Sleep(time.Duration(2^retriedErrors*100) * time.Millisecond) // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(retriedErrors)*100) * time.Millisecond)
continue continue
} }
} }
@ -189,6 +191,9 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error {
return err return err
} }
// reset the retry count after success
retriedErrors = 0
// IRecordProcessorCheckpointer // IRecordProcessorCheckpointer
input := &kcl.ProcessRecordsInput{ input := &kcl.ProcessRecordsInput{
Records: getResp.Records, Records: getResp.Records,