From b49cbdf4fc8fb60889220e6f41bd6f388485c41f Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 23 Jan 2023 13:44:14 -0800 Subject: [PATCH] fix: rate limiting getRecords tps Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index cd4565a..cdc8322 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -129,20 +129,49 @@ func (sc *PollingShardConsumer) getRecords() error { getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) - - // Get records from stream and retry as needed getRecordsArgs := &kinesis.GetRecordsInput{ Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), ShardIterator: shardIterator, } - getResp, err := sc.callGetRecordsAPI(getRecordsArgs) + + // Get records from stream and retry as needed + // Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction. + // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs) + getRecordsTransactionTime := time.Now() if err != nil { //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling var throughputExceededErr *types.ProvisionedThroughputExceededException var kmsThrottlingErr *types.KMSThrottlingException - if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) { + if errors.As(err, &throughputExceededErr) { + retriedErrors++ + if retriedErrors > sc.kclConfig.MaxRetryCount { + log.Errorf("message", "reached max retry count getting records from shard", + "shardId", sc.shard.ID, + "retryCount", retriedErrors, + "error", err) + return err + } + // If there is insufficient provisioned throughput on the stream, + // subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException. + // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + waitTime := time.Since(getRecordsTransactionTime) + if waitTime < time.Second { + time.Sleep(time.Second - waitTime) + } + continue + } + if errors.As(err, &kmsThrottlingErr) { log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) retriedErrors++ + // Greater than MaxRetryCount so we get the last retry + if retriedErrors > sc.kclConfig.MaxRetryCount { + log.Errorf("message", "reached max retry count getting records from shard", + "shardId", sc.shard.ID, + "retryCount", retriedErrors, + "error", err) + return err + } // exponential backoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) @@ -151,6 +180,7 @@ func (sc *PollingShardConsumer) getRecords() error { log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs) return err } + // reset the retry count after success retriedErrors = 0 @@ -181,8 +211,3 @@ func (sc *PollingShardConsumer) getRecords() error { } } } - -func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { - getResp, err := sc.kc.GetRecords(context.TODO(), gri) - return getResp, err -}