fix: rate limiting getRecords tps
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
parent
9fcd1a8293
commit
b49cbdf4fc
1 changed files with 34 additions and 9 deletions
|
|
@ -129,20 +129,49 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
getRecordsStartTime := time.Now()
|
getRecordsStartTime := time.Now()
|
||||||
|
|
||||||
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
|
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
|
||||||
|
|
||||||
// Get records from stream and retry as needed
|
|
||||||
getRecordsArgs := &kinesis.GetRecordsInput{
|
getRecordsArgs := &kinesis.GetRecordsInput{
|
||||||
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
|
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
|
||||||
ShardIterator: shardIterator,
|
ShardIterator: shardIterator,
|
||||||
}
|
}
|
||||||
getResp, err := sc.callGetRecordsAPI(getRecordsArgs)
|
|
||||||
|
// Get records from stream and retry as needed
|
||||||
|
// Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction.
|
||||||
|
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
||||||
|
getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs)
|
||||||
|
getRecordsTransactionTime := time.Now()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
|
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
|
||||||
var throughputExceededErr *types.ProvisionedThroughputExceededException
|
var throughputExceededErr *types.ProvisionedThroughputExceededException
|
||||||
var kmsThrottlingErr *types.KMSThrottlingException
|
var kmsThrottlingErr *types.KMSThrottlingException
|
||||||
if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) {
|
if errors.As(err, &throughputExceededErr) {
|
||||||
|
retriedErrors++
|
||||||
|
if retriedErrors > sc.kclConfig.MaxRetryCount {
|
||||||
|
log.Errorf("message", "reached max retry count getting records from shard",
|
||||||
|
"shardId", sc.shard.ID,
|
||||||
|
"retryCount", retriedErrors,
|
||||||
|
"error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// If there is insufficient provisioned throughput on the stream,
|
||||||
|
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
|
||||||
|
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
||||||
|
waitTime := time.Since(getRecordsTransactionTime)
|
||||||
|
if waitTime < time.Second {
|
||||||
|
time.Sleep(time.Second - waitTime)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if errors.As(err, &kmsThrottlingErr) {
|
||||||
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
|
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
|
||||||
retriedErrors++
|
retriedErrors++
|
||||||
|
// Greater than MaxRetryCount so we get the last retry
|
||||||
|
if retriedErrors > sc.kclConfig.MaxRetryCount {
|
||||||
|
log.Errorf("message", "reached max retry count getting records from shard",
|
||||||
|
"shardId", sc.shard.ID,
|
||||||
|
"retryCount", retriedErrors,
|
||||||
|
"error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
// exponential backoff
|
// exponential backoff
|
||||||
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
|
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
|
||||||
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
|
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
|
||||||
|
|
@ -151,6 +180,7 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs)
|
log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the retry count after success
|
// reset the retry count after success
|
||||||
retriedErrors = 0
|
retriedErrors = 0
|
||||||
|
|
||||||
|
|
@ -181,8 +211,3 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
|
||||||
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
|
|
||||||
return getResp, err
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue