From 6ff3cd1b1587fc57b69f7bfbd5b0463dec929ea7 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Sat, 19 Dec 2020 17:44:40 -0600 Subject: [PATCH] Fix retry logic for dynamodb (#83) Adding min/max retry and throttle delay for the retryer. Also, increase the max retries to 10 which is inline with dynamodb default retry count. Signed-off-by: Tao Jiang --- clientlibrary/checkpoint/dynamodb-checkpointer.go | 10 ++++++++-- test/record_processor_test.go | 2 +- test/record_publisher_test.go | 10 +++++----- test/worker_test.go | 4 ++-- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 1b61f45..694522b 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -48,7 +48,7 @@ const ( ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created" // NumMaxRetries is the max times of doing retry - NumMaxRetries = 5 + NumMaxRetries = 10 ) // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend @@ -92,7 +92,13 @@ func (checkpointer *DynamoCheckpoint) Init() error { Region: aws.String(checkpointer.kclConfig.RegionName), Endpoint: aws.String(checkpointer.kclConfig.DynamoDBEndpoint), Credentials: checkpointer.kclConfig.DynamoDBCredentials, - Retryer: client.DefaultRetryer{NumMaxRetries: checkpointer.Retries}, + Retryer: client.DefaultRetryer{ + NumMaxRetries: checkpointer.Retries, + MinRetryDelay: client.DefaultRetryerMinRetryDelay, + MinThrottleDelay: client.DefaultRetryerMinThrottleDelay, + MaxRetryDelay: client.DefaultRetryerMaxRetryDelay, + MaxThrottleDelay: client.DefaultRetryerMaxRetryDelay, + }, }) if err != nil { diff --git a/test/record_processor_test.go b/test/record_processor_test.go index 6c93d6c..c20b901 100644 --- a/test/record_processor_test.go +++ b/test/record_processor_test.go @@ -43,7 +43,7 @@ func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor { // Create a dump record processor for printing out all data from record. type dumpRecordProcessor struct { - t *testing.T + t *testing.T count int } diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go index 1bddbf2..cb61f83 100644 --- a/test/record_publisher_test.go +++ b/test/record_publisher_test.go @@ -31,7 +31,7 @@ import ( const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}` // NewKinesisClient to create a Kinesis Client. -func NewKinesisClient(t *testing.T, regionName, endpoint string, credentials *credentials.Credentials) *kinesis.Kinesis{ +func NewKinesisClient(t *testing.T, regionName, endpoint string, credentials *credentials.Credentials) *kinesis.Kinesis { s, err := session.NewSession(&aws.Config{ Region: aws.String(regionName), Endpoint: aws.String(endpoint), @@ -81,7 +81,7 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) { // Use random string as partition key to ensure even distribution across shards records := make([]*kinesis.PutRecordsRequestEntry, 5) - for i:= 0; i < 5; i++ { + for i := 0; i < 5; i++ { records[i] = &kinesis.PutRecordsRequestEntry{ Data: []byte(specstr), PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), @@ -89,11 +89,11 @@ func publishRecords(t *testing.T, kc kinesisiface.KinesisAPI) { } _, err := kc.PutRecords(&kinesis.PutRecordsInput{ - Records: records, - StreamName: aws.String(streamName), + Records: records, + StreamName: aws.String(streamName), }) if err != nil { t.Errorf("Error in PutRecords. %+v", err) } -} \ No newline at end of file +} diff --git a/test/worker_test.go b/test/worker_test.go index 73f6cdf..bbe5e51 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -51,7 +51,7 @@ const metricsSystem = "cloudwatch" var shardID string func TestWorker(t *testing.T) { - // At miminal. use standard logrus logger + // At minimal. use standard logrus logger // log := logger.NewLogrusLogger(logrus.StandardLogger()) // // In order to have precise control over logging. Use logger with config @@ -241,4 +241,4 @@ func getMetricsConfig(kclConfig *cfg.KinesisClientLibConfiguration, service stri } return nil -} \ No newline at end of file +}