From d058203b6ef5b651c6062e30a30c38fb1f4aa796 Mon Sep 17 00:00:00 2001 From: Prometheus Date: Mon, 4 Jun 2018 20:07:58 -0700 Subject: [PATCH] Make what aws error to trigger retry decided by caller (#52) * remove ValidateCheckpoint * make retrying on error decided by caller --- checkpoint/ddb/ddb.go | 25 ++++++++++++------------- checkpoint/ddb/retryer.go | 24 ++++++++++++++++++++++++ examples/consumer/main.go | 19 ++++++++++++++++++- 3 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 checkpoint/ddb/retryer.go diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index b01d290..170ced4 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -1,4 +1,4 @@ -package redis +package ddb import ( "fmt" @@ -7,7 +7,6 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" @@ -31,6 +30,13 @@ func WithDynamoClient(svc dynamodbiface.DynamoDBAPI) Option { } } +// WithRetryer sets the retryer +func WithRetryer(r Retryer) Option { + return func(c *Checkpoint) { + c.retryer = r + } +} + // New returns a checkpoint that uses DynamoDB for underlying storage func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { client := dynamodb.New(session.New(aws.NewConfig())) @@ -43,6 +49,7 @@ func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { done: make(chan struct{}), mu: &sync.Mutex{}, checkpoints: map[key]string{}, + retryer: &DefaultRetryer{}, } for _, opt := range opts { @@ -63,6 +70,7 @@ type Checkpoint struct { mu *sync.Mutex // protects the checkpoints checkpoints map[key]string done chan struct{} + retryer Retryer } type key struct { @@ -97,7 +105,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) { resp, err := c.client.GetItem(params) if err != nil { - if retriableError(err) { + if c.retryer.ShouldRetry(err) { return c.Get(streamName, shardID) } return "", err @@ -168,7 +176,7 @@ func (c *Checkpoint) save() error { Item: item, }) if err != nil { - if !retriableError(err) { + if !c.retryer.ShouldRetry(err) { return err } return c.save() @@ -177,12 +185,3 @@ func (c *Checkpoint) save() error { return nil } - -func retriableError(err error) bool { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException { - return true - } - } - return false -} diff --git a/checkpoint/ddb/retryer.go b/checkpoint/ddb/retryer.go new file mode 100644 index 0000000..bad3049 --- /dev/null +++ b/checkpoint/ddb/retryer.go @@ -0,0 +1,24 @@ +package ddb + +import ( + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/dynamodb" +) + +// Retryer interface contains one method that decides whether to retry based on error +type Retryer interface { + ShouldRetry(error) bool +} + +type DefaultRetryer struct { + Retryer +} + +func (r *DefaultRetryer) ShouldRetry(err error) bool { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException { + return true + } + } + return false +} diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 5c8bc99..dad2e30 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -12,6 +12,7 @@ import ( "os/signal" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/kinesis" @@ -48,7 +49,7 @@ func main() { }) // ddb checkpoint - ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) + ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{})) if err != nil { log.Fatalf("checkpoint error: %v", err) } @@ -100,3 +101,19 @@ func main() { log.Fatalf("checkpoint shutdown error: %v", err) } } + +type MyRetryer struct { + checkpoint.Retryer +} + +func (r *MyRetryer) ShouldRetry(err error) bool { + if awsErr, ok := err.(awserr.Error); ok { + switch awsErr.Code() { + case dynamodb.ErrCodeProvisionedThroughputExceededException, dynamodb.ErrCodeLimitExceededException: + return true + default: + return false + } + } + return false +}