From cd1f34e4895211daf34b98670ee03fbcc00d506a Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 23 Jan 2023 13:49:25 -0800 Subject: [PATCH] fix: max bytes per second getRecords check Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index cdc8322..1491225 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -44,6 +44,10 @@ import ( "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" ) +const ( + MaxReadTransactionsPerSecond = 5 +) + // PollingShardConsumer is responsible for polling data records from a (specified) shard. // Note: PollingShardConsumer only deal with one shard. type PollingShardConsumer struct { @@ -107,6 +111,8 @@ func (sc *PollingShardConsumer) getRecords() error { recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) retriedErrors := 0 + transactionNum := 0 + var firstTransactionTime time.Time for { if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { @@ -134,6 +140,15 @@ func (sc *PollingShardConsumer) getRecords() error { ShardIterator: shardIterator, } + // Each shard can support up to five read transactions per second. + if transactionNum > MaxReadTransactionsPerSecond { + transactionNum = 0 + timeDiff := time.Since(firstTransactionTime) + if timeDiff < time.Second { + time.Sleep(timeDiff) + } + } + // Get records from stream and retry as needed // Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction. // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html @@ -184,6 +199,12 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 + // Add to number of getRecords successful transactions + transactionNum++ + if transactionNum == 1 { + firstTransactionTime = getRecordsTransactionTime + } + sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) // The shard has been closed, so no new records can be read from it