fix: created seperate rate limit check method callGetRecordsAPI

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
Shiva Pentakota 2023-01-23 16:03:34 -08:00
parent cd1f34e489
commit adcff0b7bb

View file

@ -45,6 +45,8 @@ import (
) )
const ( const (
MaxBytes = 10000000.0
MaxBytesPerSecond = 2000000.0
MaxReadTransactionsPerSecond = 5 MaxReadTransactionsPerSecond = 5
) )
@ -112,6 +114,8 @@ func (sc *PollingShardConsumer) getRecords() error {
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
retriedErrors := 0 retriedErrors := 0
transactionNum := 0 transactionNum := 0
remBytes := MaxBytes
var lastCheckTime time.Time
var firstTransactionTime time.Time var firstTransactionTime time.Time
for { for {
@ -199,10 +203,17 @@ func (sc *PollingShardConsumer) getRecords() error {
// reset the retry count after success // reset the retry count after success
retriedErrors = 0 retriedErrors = 0
// Calculate size of records from read transaction
numBytes := 0
for _, record := range getResp.Records {
numBytes = numBytes + len(record.Data)
}
// Add to number of getRecords successful transactions // Add to number of getRecords successful transactions
transactionNum++ transactionNum++
if transactionNum == 1 { if transactionNum == 1 {
firstTransactionTime = getRecordsTransactionTime firstTransactionTime = getRecordsTransactionTime
lastCheckTime = firstTransactionTime
} }
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
@ -214,6 +225,27 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.recordProcessor.Shutdown(shutdownInput) sc.recordProcessor.Shutdown(shutdownInput)
return nil return nil
} }
// Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords.
// If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
// check for overspending of byte budget from getRecords call
currTime := time.Now()
timePassed := currTime.Sub(lastCheckTime)
lastCheckTime = currTime
remBytes = remBytes + float64(timePassed.Seconds())*(MaxBytes/(float64(time.Second*5)))
if remBytes > MaxBytes {
remBytes = MaxBytes
}
if remBytes <= float64(numBytes) {
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
coolDown := numBytes / MaxBytesPerSecond
time.Sleep(time.Duration(coolDown) * time.Second)
} else {
remBytes = remBytes - float64(numBytes)
}
shardIterator = getResp.NextShardIterator shardIterator = getResp.NextShardIterator
// Idle between each read, the user is responsible for checkpoint the progress // Idle between each read, the user is responsible for checkpoint the progress