From adcff0b7bbd517d53a10dfa8a9d34c6e8a6ea117 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 23 Jan 2023 16:03:34 -0800 Subject: [PATCH] fix: created seperate rate limit check method callGetRecordsAPI Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 1491225..06e7d78 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -45,6 +45,8 @@ import ( ) const ( + MaxBytes = 10000000.0 + MaxBytesPerSecond = 2000000.0 MaxReadTransactionsPerSecond = 5 ) @@ -112,6 +114,8 @@ func (sc *PollingShardConsumer) getRecords() error { recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) retriedErrors := 0 transactionNum := 0 + remBytes := MaxBytes + var lastCheckTime time.Time var firstTransactionTime time.Time for { @@ -199,10 +203,17 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 + // Calculate size of records from read transaction + numBytes := 0 + for _, record := range getResp.Records { + numBytes = numBytes + len(record.Data) + } + // Add to number of getRecords successful transactions transactionNum++ if transactionNum == 1 { firstTransactionTime = getRecordsTransactionTime + lastCheckTime = firstTransactionTime } sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) @@ -214,6 +225,27 @@ func (sc *PollingShardConsumer) getRecords() error { sc.recordProcessor.Shutdown(shutdownInput) return nil } + + // Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords. + // If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception. + // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + // check for overspending of byte budget from getRecords call + currTime := time.Now() + timePassed := currTime.Sub(lastCheckTime) + lastCheckTime = currTime + + remBytes = remBytes + float64(timePassed.Seconds())*(MaxBytes/(float64(time.Second*5))) + if remBytes > MaxBytes { + remBytes = MaxBytes + } + if remBytes <= float64(numBytes) { + // Wait until cool down period has passed to prevent ProvisionedThroughputExceededException + coolDown := numBytes / MaxBytesPerSecond + time.Sleep(time.Duration(coolDown) * time.Second) + } else { + remBytes = remBytes - float64(numBytes) + } + shardIterator = getResp.NextShardIterator // Idle between each read, the user is responsible for checkpoint the progress