Fix retry logic for dynamodb (#83)

Adding min/max retry and throttle delay for the retryer.
Also, increase the max retries to 10 which is inline with
dynamodb default retry count.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2020-12-19 17:44:40 -06:00
parent f1982602ff
commit 6ff3cd1b15
4 changed files with 16 additions and 10 deletions

View file

@ -48,7 +48,7 @@ const (
ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created"
// NumMaxRetries is the max times of doing retry
NumMaxRetries = 5
NumMaxRetries = 10
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
@ -92,7 +92,13 @@ func (checkpointer *DynamoCheckpoint) Init() error {
Region: aws.String(checkpointer.kclConfig.RegionName),
Endpoint: aws.String(checkpointer.kclConfig.DynamoDBEndpoint),
Credentials: checkpointer.kclConfig.DynamoDBCredentials,
Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries},
Retryer: client.DefaultRetryer{
NumMaxRetries: checkpointer.Retries,
MinRetryDelay: client.DefaultRetryerMinRetryDelay,
MinThrottleDelay: client.DefaultRetryerMinThrottleDelay,
MaxRetryDelay: client.DefaultRetryerMaxRetryDelay,
MaxThrottleDelay: client.DefaultRetryerMaxRetryDelay,
},
})
if err != nil {

View file

@ -43,7 +43,7 @@ func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
// Create a dump record processor for printing out all data from record.
type dumpRecordProcessor struct {
t *testing.T
t *testing.T
count int
}

View file

@ -31,7 +31,7 @@ import (
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
// NewKinesisClient to create a Kinesis Client.
func NewKinesisClient(t *testing.T, regionName, endpoint string, credentials *credentials.Credentials) *kinesis.Kinesis{
func NewKinesisClient(t *testing.T, regionName, endpoint string, credentials *credentials.Credentials) *kinesis.Kinesis {
s, err := session.NewSession(&aws.Config{
Region: aws.String(regionName),
Endpoint: aws.String(endpoint),
@ -81,7 +81,7 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) {
// Use random string as partition key to ensure even distribution across shards
records := make([]*kinesis.PutRecordsRequestEntry, 5)
for i:= 0; i < 5; i++ {
for i := 0; i < 5; i++ {
records[i] = &kinesis.PutRecordsRequestEntry{
Data: []byte(specstr),
PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)),
@ -89,11 +89,11 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) {
}
_, err := kc.PutRecords(&kinesis.PutRecordsInput{
Records: records,
StreamName: aws.String(streamName),
Records: records,
StreamName: aws.String(streamName),
})
if err != nil {
t.Errorf("Error in PutRecords. %+v", err)
}
}
}

View file

@ -51,7 +51,7 @@ const metricsSystem = "cloudwatch"
var shardID string
func TestWorker(t *testing.T) {
// At miminal. use standard logrus logger
// At minimal. use standard logrus logger
// log := logger.NewLogrusLogger(logrus.StandardLogger())
//
// In order to have precise control over logging. Use logger with config
@ -241,4 +241,4 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri
}
return nil
}
}