fix: max bytes per second getRecords check
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
This commit is contained in:
parent
b49cbdf4fc
commit
cd1f34e489
1 changed files with 21 additions and 0 deletions
|
|
@ -44,6 +44,10 @@ import (
|
||||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MaxReadTransactionsPerSecond = 5
|
||||||
|
)
|
||||||
|
|
||||||
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
|
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
|
||||||
// Note: PollingShardConsumer only deal with one shard.
|
// Note: PollingShardConsumer only deal with one shard.
|
||||||
type PollingShardConsumer struct {
|
type PollingShardConsumer struct {
|
||||||
|
|
@ -107,6 +111,8 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
|
|
||||||
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
|
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
|
||||||
retriedErrors := 0
|
retriedErrors := 0
|
||||||
|
transactionNum := 0
|
||||||
|
var firstTransactionTime time.Time
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
|
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
|
||||||
|
|
@ -134,6 +140,15 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
ShardIterator: shardIterator,
|
ShardIterator: shardIterator,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Each shard can support up to five read transactions per second.
|
||||||
|
if transactionNum > MaxReadTransactionsPerSecond {
|
||||||
|
transactionNum = 0
|
||||||
|
timeDiff := time.Since(firstTransactionTime)
|
||||||
|
if timeDiff < time.Second {
|
||||||
|
time.Sleep(timeDiff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get records from stream and retry as needed
|
// Get records from stream and retry as needed
|
||||||
// Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction.
|
// Each read transaction can provide up to 10,000 records with an upper quota of 10 MB per transaction.
|
||||||
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
||||||
|
|
@ -184,6 +199,12 @@ func (sc *PollingShardConsumer) getRecords() error {
|
||||||
// reset the retry count after success
|
// reset the retry count after success
|
||||||
retriedErrors = 0
|
retriedErrors = 0
|
||||||
|
|
||||||
|
// Add to number of getRecords successful transactions
|
||||||
|
transactionNum++
|
||||||
|
if transactionNum == 1 {
|
||||||
|
firstTransactionTime = getRecordsTransactionTime
|
||||||
|
}
|
||||||
|
|
||||||
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
|
||||||
|
|
||||||
// The shard has been closed, so no new records can be read from it
|
// The shard has been closed, so no new records can be read from it
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue