Make what aws error to trigger retry decided by caller (#52)

* remove ValidateCheckpoint
* make retrying on error decided by caller
This commit is contained in:
Prometheus 2018-06-04 20:07:58 -07:00 committed by Harlow Ward
parent 9a7e102a05
commit d058203b6e
3 changed files with 54 additions and 14 deletions

View file

@ -1,4 +1,4 @@
package redis package ddb
import ( import (
"fmt" "fmt"
@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
@ -31,6 +30,13 @@ func WithDynamoClient(svc dynamodbiface.DynamoDBAPI) Option {
} }
} }
// WithRetryer sets the retryer
func WithRetryer(r Retryer) Option {
return func(c *Checkpoint) {
c.retryer = r
}
}
// New returns a checkpoint that uses DynamoDB for underlying storage // New returns a checkpoint that uses DynamoDB for underlying storage
func New(appName, tableName string, opts ...Option) (*Checkpoint, error) { func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
client := dynamodb.New(session.New(aws.NewConfig())) client := dynamodb.New(session.New(aws.NewConfig()))
@ -43,6 +49,7 @@ func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
done: make(chan struct{}), done: make(chan struct{}),
mu: &sync.Mutex{}, mu: &sync.Mutex{},
checkpoints: map[key]string{}, checkpoints: map[key]string{},
retryer: &DefaultRetryer{},
} }
for _, opt := range opts { for _, opt := range opts {
@ -63,6 +70,7 @@ type Checkpoint struct {
mu *sync.Mutex // protects the checkpoints mu *sync.Mutex // protects the checkpoints
checkpoints map[key]string checkpoints map[key]string
done chan struct{} done chan struct{}
retryer Retryer
} }
type key struct { type key struct {
@ -97,7 +105,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
resp, err := c.client.GetItem(params) resp, err := c.client.GetItem(params)
if err != nil { if err != nil {
if retriableError(err) { if c.retryer.ShouldRetry(err) {
return c.Get(streamName, shardID) return c.Get(streamName, shardID)
} }
return "", err return "", err
@ -168,7 +176,7 @@ func (c *Checkpoint) save() error {
Item: item, Item: item,
}) })
if err != nil { if err != nil {
if !retriableError(err) { if !c.retryer.ShouldRetry(err) {
return err return err
} }
return c.save() return c.save()
@ -177,12 +185,3 @@ func (c *Checkpoint) save() error {
return nil return nil
} }
func retriableError(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException {
return true
}
}
return false
}

24
checkpoint/ddb/retryer.go Normal file
View file

@ -0,0 +1,24 @@
package ddb
import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
)
// Retryer interface contains one method that decides whether to retry based on error
type Retryer interface {
ShouldRetry(error) bool
}
type DefaultRetryer struct {
Retryer
}
func (r *DefaultRetryer) ShouldRetry(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException {
return true
}
}
return false
}

View file

@ -12,6 +12,7 @@ import (
"os/signal" "os/signal"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
@ -48,7 +49,7 @@ func main() {
}) })
// ddb checkpoint // ddb checkpoint
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient)) ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
@ -100,3 +101,19 @@ func main() {
log.Fatalf("checkpoint shutdown error: %v", err) log.Fatalf("checkpoint shutdown error: %v", err)
} }
} }
type MyRetryer struct {
checkpoint.Retryer
}
func (r *MyRetryer) ShouldRetry(err error) bool {
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException, dynamodb.ErrCodeLimitExceededException:
return true
default:
return false
}
}
return false
}