From 6a1a7b7da6a0a85ab2e3febd3861875c27f44b8a Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Tue, 30 Oct 2018 12:41:45 -0700 Subject: [PATCH] Fix the exponential backoff Fix the calculation of exponential backoff. ^ is the XOR in golang. Replaced it with math.exp2(). --- clientlibrary/worker/checkpointer.go | 7 ++++--- clientlibrary/worker/shard-consumer.go | 9 +++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/clientlibrary/worker/checkpointer.go b/clientlibrary/worker/checkpointer.go index 30083b2..1361f78 100644 --- a/clientlibrary/worker/checkpointer.go +++ b/clientlibrary/worker/checkpointer.go @@ -29,6 +29,7 @@ package worker import ( "errors" + "math" "time" "github.com/aws/aws-sdk-go/aws" @@ -300,7 +301,7 @@ func (checkpointer *DynamoCheckpoint) putItem(input *dynamodb.PutItemInput) erro awsErr.Code() == dynamodb.ErrCodeInternalServerError && attempt < checkpointer.Retries { // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html - time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) + time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond) return true, err } } @@ -325,7 +326,7 @@ func (checkpointer *DynamoCheckpoint) getItem(shardID string) (map[string]*dynam awsErr.Code() == dynamodb.ErrCodeInternalServerError && attempt < checkpointer.Retries { // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html - time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) + time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond) return true, err } } @@ -350,7 +351,7 @@ func (checkpointer *DynamoCheckpoint) removeItem(shardID string) error { awsErr.Code() == dynamodb.ErrCodeInternalServerError && attempt < checkpointer.Retries { // Backoff time as recommended by https://docs.aws.amazon.com/general/latest/gr/api-retries.html - time.Sleep(time.Duration(2^attempt*100) * time.Millisecond) + time.Sleep(time.Duration(math.Exp2(float64(attempt))*100) * time.Millisecond) return true, err } } diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 1326cb1..d4122c8 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -29,6 +29,7 @@ package worker import ( log "github.com/sirupsen/logrus" + "math" "sync" "time" @@ -149,9 +150,9 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { sc.recordProcessor.Initialize(input) recordCheckpointer := NewRecordProcessorCheckpoint(shard, sc.checkpointer) + retriedErrors := 0 for { - retriedErrors := 0 getRecordsStartTime := time.Now() if time.Now().UTC().After(shard.LeaseTimeout.Add(-5 * time.Second)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", shard.ID, sc.consumerID) @@ -181,7 +182,8 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { log.Errorf("Error getting records from shard %v: %+v", shard.ID, err) retriedErrors++ // exponential backoff - time.Sleep(time.Duration(2^retriedErrors*100) * time.Millisecond) + // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff + time.Sleep(time.Duration(math.Exp2(retriedErrors)*100) * time.Millisecond) continue } } @@ -189,6 +191,9 @@ func (sc *ShardConsumer) getRecords(shard *shardStatus) error { return err } + // reset the retry count after success + retriedErrors = 0 + // IRecordProcessorCheckpointer input := &kcl.ProcessRecordsInput{ Records: getResp.Records,