Compare commits

..

26 commits

Author SHA1 Message Date
spentakota
f6e79f1a2d
Merge pull request #53 from YakkovHow/main
fix: support go 1.21 & upgrade dependencies
2024-03-04 18:07:06 -08:00
Yakkov Leng
3676eb410a fix: support go 1.21 & upgrade dependencies
Signed-off-by: Yakkov Leng <yleng@vmware.com>
2024-03-04 17:56:27 -08:00
spentakota
b12921da23
Merge pull request #37 from vmware/spentakota_passContext
fix: pass in ctx with cancel for renewLease
2023-04-06 18:09:16 -07:00
Shiva Pentakota
4482696d95 fix: pass in ctx with cancel for renewLease
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-06 17:53:46 -07:00
vmwjc
16c5c53a30
Merge pull request #13 from calebstewart/fix/issue-5-empty-kinesisendpoint
Automatically resolve default KinesisEndpoint
2023-04-04 14:40:26 -07:00
vmwjc
f97ba4e3b2
Merge pull request #36 from vmware/clean-up-extra-err-check
chore: Remove extraneous err check
2023-04-04 14:21:24 -07:00
John Calixto
c1f6b270ab chore: Remove extraneous err check
After checking the scan result above this line, checking err here no
longer has any effect.

Signed-off-by: John Calixto <jcalixto@vmware.com>
2023-04-04 14:15:59 -07:00
vmwjc
6120c11333
Merge pull request #10 from mrmonaghan/fix-infinite-worker-loop
fixing infinite worker loop
2023-04-04 14:08:59 -07:00
spentakota
8ecb5b40a2
Merge pull request #35 from vmware/spentakota_retLeaseErr
fix: return err log in case of ErrLeaseNotAcquired
2023-04-04 11:21:14 -07:00
Shiva Pentakota
86d70940e6 fix: return err log in case of ErrLeaseNotAcquired
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-04 11:17:22 -07:00
spentakota
6516287f6d
Merge pull request #34 from vmware/spentakota_asyncLeaseRenewal
feat: make lease renewal async
2023-04-03 15:02:28 -07:00
Shiva Pentakota
4aebaf1ae0 feat: make lease renewal async
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-03 14:58:04 -07:00
spentakota
5be0422f33
Merge pull request #32 from vmware/spentakota_overdueShutdown
fix: add shutdown error case for checkpoint function
2023-03-30 12:51:49 -07:00
Shiva Pentakota
02d4b44ff6 fix: add shutdown and leaseExpired error cases for checkpoint function
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-03-28 11:46:55 -07:00
cmckelvey-vmware
44f1558847
Merge pull request #31 from vmware/dependabot/go_modules/golang.org/x/sys-0.1.0
Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0
2023-03-28 10:59:31 -06:00
cmckelvey-vmware
982c468bef
Merge pull request #30 from vmware/dependabot/go_modules/github.com/prometheus/client_golang-1.11.1
Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1
2023-03-28 10:58:45 -06:00
dependabot[bot]
09f0889f28
Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0.
- [Release notes](https://github.com/golang/sys/releases)
- [Commits](https://github.com/golang/sys/commits/v0.1.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-28 16:57:32 +00:00
dependabot[bot]
236412324f
Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.0 to 1.11.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.11.0...v1.11.1)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-28 16:56:55 +00:00
spentakota
61a027efef
Merge pull request #29 from vmware/fix-token-bucket-edge-cases
fix: Check token bucket corner cases correctly.
2023-03-23 12:46:42 -07:00
John Calixto
987fada9d3 fix: Check token bucket corner cases correctly.
Signed-off-by: John Calixto <jcalixto@vmware.com>
2023-03-23 11:23:35 -07:00
spentakota
711b72932a
Merge pull request #27 from vmware/spentakota_AddLogs
chore: add info logs in sleep case for kinesis backoff errors
2023-03-22 14:43:58 -07:00
Shiva Pentakota
a7c063b99c chore: add info logs in sleep case for kinesis backoff errors
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-03-22 12:22:26 -07:00
spentakota
bce4dd3f42
Merge pull request #24 from vmware/spentakota_fixNanoBug
fix: use nanosecond precision in lease comparisons
2023-02-14 10:30:14 -08:00
Shiva Pentakota
df16ef451c fix: use nanosecond precision in lease comparisons
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-02-13 17:56:11 -08:00
Caleb Stewart
e2a45c53c3 Automatically resolve default KinesisEndpoint
This commit fixes #5 by returning `aws.EndpointNotFoundError` from the
endpoint resolver when no `KinesisEndpoint` is defined, which will
resolve the default AWS endpoint. This is the same process used by the
DynamoDB checkpointer to resolve the default endpoint.

Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com>
2022-10-13 13:37:51 -04:00
Mike Monaghan
aab08b9050 fixing infinite worker loop
Signed-off-by: Mike Monaghan <mike_monaghan@live.ca>
2022-09-13 15:31:16 -06:00
13 changed files with 336 additions and 113 deletions

View file

@ -53,10 +53,6 @@ read our [Developer Certificate of Origin](https://cla.vmware.com/dco). All cont
signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on
as an open-source patch. For more detailed information, refer to [CONTRIBUTING.md](CONTRIBUTING.md). as an open-source patch. For more detailed information, refer to [CONTRIBUTING.md](CONTRIBUTING.md).
## Additional Notes If Modifying kclConfig values
* Currently there are some bugs with the lease stealing logic within vmware-go-kcl-v2 so please keep the EnableLeaseStealing kclConfig value as false.
* When using a non-default FailoverTimeMillis kclConfig value, keeping a 2:1 ratio between FailoverTimeMillis and LeaseRefreshPeriodMillis typically helps to reduce the chance of worker lease contention.
## License ## License
MIT License MIT License

View file

@ -79,6 +79,9 @@ type Checkpointer interface {
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error RemoveLeaseOwner(string) error
// GetLeaseOwner to get current owner of lease for shard
GetLeaseOwner(string) (string, error)
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

View file

@ -51,6 +51,10 @@ const (
NumMaxRetries = 10 NumMaxRetries = 10
) )
var (
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct { type DynamoCheckpoint struct {
log logger.Logger log logger.Logger
@ -129,7 +133,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
// GetLease attempts to gain a lock on the given shard // GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error { func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC() newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339) newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
currentCheckpoint, err := checkpointer.getItem(shard.ID) currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil { if err != nil {
return err return err
@ -161,7 +165,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout) currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
if err != nil { if err != nil {
return err return err
} }
@ -246,7 +250,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
// CheckpointSequence writes a checkpoint at the designated sequence ID // CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339) leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
marshalledCheckpoint := map[string]types.AttributeValue{ marshalledCheckpoint := map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{ LeaseKeyKey: &types.AttributeValueMemberS{
Value: shard.ID, Value: shard.ID,
@ -290,7 +294,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value) currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
if err != nil { if err != nil {
return err return err
} }
@ -336,6 +340,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
return err return err
} }
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
currentCheckpoint, err := checkpointer.getItem(shardID)
if err != nil {
return "", err
}
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
if !assignedToOk {
return "", NoLeaseOwnerErr
}
return assignedVar.(*types.AttributeValueMemberS).Value, nil
}
// ListActiveWorkers returns a map of workers and their shards // ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) { func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
err := checkpointer.syncLeases(shardStatus) err := checkpointer.syncLeases(shardStatus)
@ -370,7 +391,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
if err != nil && err != ErrSequenceIDNotFound { if err != nil && err != ErrSequenceIDNotFound {
return err return err
} }
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339) leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)` conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
expressionAttributeValues := map[string]types.AttributeValue{ expressionAttributeValues := map[string]types.AttributeValue{
@ -462,10 +483,6 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
} }
} }
if err != nil {
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
return err
}
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond) log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
return nil return nil
} }

View file

@ -69,6 +69,9 @@ const (
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
DefaultLeaseRefreshPeriodMillis = 5000 DefaultLeaseRefreshPeriodMillis = 5000
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
DefaultLeaseRefreshWaitTime = 2500
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
DefaultMaxRecords = 10000 DefaultMaxRecords = 10000
@ -216,6 +219,9 @@ type (
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int LeaseRefreshPeriodMillis int
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
LeaseRefreshWaitTime int
// MaxRecords Max records to read per Kinesis getRecords() call // MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int MaxRecords int

View file

@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
MaxRetryCount: DefaultMaxRetryCount, MaxRetryCount: DefaultMaxRetryCount,
Logger: logger.GetDefaultLogger(), Logger: logger.GetDefaultLogger(),
} }
@ -149,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
return c return c
} }
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.ShardSyncIntervalMillis = shardSyncIntervalMillis c.ShardSyncIntervalMillis = shardSyncIntervalMillis

View file

@ -332,6 +332,10 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
m.processRecordsTime = append(m.processRecordsTime, time) m.processRecordsTime = append(m.processRecordsTime, time)
} }
func (cw *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
// not implemented
}
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{} var i interface{}
var ok bool var ok bool

View file

@ -173,7 +173,6 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
input.CacheEntryTime = &getRecordsStartTime input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input) sc.recordProcessor.ProcessRecords(input)
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
} }

View file

@ -32,6 +32,7 @@ package worker
import ( import (
"context" "context"
"errors" "errors"
log "github.com/sirupsen/logrus"
"math" "math"
"time" "time"
@ -98,7 +99,12 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record // getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error { func (sc *PollingShardConsumer) getRecords() error {
defer sc.releaseLease(sc.shard.ID) ctx, cancelFunc := context.WithCancel(context.Background())
defer func() {
// cancel renewLease()
cancelFunc()
sc.releaseLease(sc.shard.ID)
}()
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
@ -133,24 +139,12 @@ func (sc *PollingShardConsumer) getRecords() error {
sc.bytesRead = 0 sc.bytesRead = 0
sc.remBytes = MaxBytes sc.remBytes = MaxBytes
// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
for { for {
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
getRecordsStartTime := time.Now() getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
@ -182,10 +176,12 @@ func (sc *PollingShardConsumer) getRecords() error {
continue continue
} }
if err == localTPSExceededError { if err == localTPSExceededError {
log.Infof("localTPSExceededError so sleep for a second")
sc.waitASecond(sc.currTime) sc.waitASecond(sc.currTime)
continue continue
} }
if err == maxBytesExceededError { if err == maxBytesExceededError {
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
time.Sleep(time.Duration(coolDownPeriod) * time.Second) time.Sleep(time.Duration(coolDownPeriod) * time.Second)
continue continue
} }
@ -235,6 +231,8 @@ func (sc *PollingShardConsumer) getRecords() error {
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput) sc.recordProcessor.Shutdown(shutdownInput)
return nil return nil
case leaseRenewalErr := <-leaseRenewalErrChan:
return leaseRenewalErr
default: default:
} }
} }
@ -256,14 +254,16 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds() secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds()
sc.lastCheckTime = currentTime sc.lastCheckTime = currentTime
sc.remBytes += int(secondsPassed * MaxBytesPerSecond) sc.remBytes += int(secondsPassed * MaxBytesPerSecond)
transactionReadRate := float64(sc.bytesRead) / (secondsPassed * BytesToMbConversion)
if sc.remBytes > MaxBytes { if sc.remBytes > MaxBytes {
sc.remBytes = MaxBytes sc.remBytes = MaxBytes
} }
if sc.remBytes <= sc.bytesRead || transactionReadRate > 2 { if sc.remBytes < 1 {
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException // Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
coolDown := sc.bytesRead / MaxBytesPerSecond coolDown := sc.bytesRead / MaxBytesPerSecond
if sc.bytesRead%MaxBytesPerSecond > 0 {
coolDown++
}
return coolDown, maxBytesExceededError return coolDown, maxBytesExceededError
} else { } else {
sc.remBytes -= sc.bytesRead sc.remBytes -= sc.bytesRead
@ -305,3 +305,30 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput)
return getResp, 0, err return getResp, 0, err
} }
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
for {
timer := time.NewTimer(renewDuration)
select {
case <-timer.C:
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case <-ctx.Done():
// clean up timer resources
if !timer.Stop() {
<-timer.C
}
log.Debugf("renewLease was canceled")
return nil
}
}
}

View file

@ -86,6 +86,8 @@ func TestCallGetRecordsAPI(t *testing.T) {
// check that correct cool off period is taken for 10mb in 1 second // check that correct cool off period is taken for 10mb in 1 second
testTime := time.Now() testTime := time.Now()
m4 := MockKinesisSubscriberGetter{} m4 := MockKinesisSubscriberGetter{}
ret4 := kinesis.GetRecordsOutput{Records: nil}
m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil)
psc4 := PollingShardConsumer{ psc4 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m4}, commonShardConsumer: commonShardConsumer{kc: &m4},
callsLeft: 2, callsLeft: 2,
@ -100,10 +102,10 @@ func TestCallGetRecordsAPI(t *testing.T) {
return testTime.Add(time.Second) return testTime.Add(time.Second)
} }
out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri) out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri)
assert.Nil(t, out4) assert.Nil(t, err4)
assert.Equal(t, maxBytesExceededError, err4) assert.Equal(t, &ret4, out4)
m4.AssertExpectations(t) m4.AssertExpectations(t)
if checkSleepVal2 != 5 { if checkSleepVal2 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2) t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2)
} }
@ -134,6 +136,8 @@ func TestCallGetRecordsAPI(t *testing.T) {
// check for correct cool off period with 8mb in .2 seconds with 6mb remaining // check for correct cool off period with 8mb in .2 seconds with 6mb remaining
testTime3 := time.Now() testTime3 := time.Now()
m6 := MockKinesisSubscriberGetter{} m6 := MockKinesisSubscriberGetter{}
ret6 := kinesis.GetRecordsOutput{Records: nil}
m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil)
psc6 := PollingShardConsumer{ psc6 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m6}, commonShardConsumer: commonShardConsumer{kc: &m6},
callsLeft: 2, callsLeft: 2,
@ -148,10 +152,10 @@ func TestCallGetRecordsAPI(t *testing.T) {
return testTime3.Add(time.Second / 5) return testTime3.Add(time.Second / 5)
} }
out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri) out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri)
assert.Nil(t, out6) assert.Nil(t, err6)
assert.Equal(t, err6, maxBytesExceededError) assert.Equal(t, &ret6, out6)
m5.AssertExpectations(t) m5.AssertExpectations(t)
if checkSleepVal4 != 4 { if checkSleepVal4 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4) t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4)
} }
@ -196,3 +200,156 @@ func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, para
func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) {
return nil, nil return nil, nil
} }
func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
refTime := time.Now()
type fields struct {
lastCheckTime time.Time
remBytes int
bytesRead int
}
tests := []struct {
name string
fields fields
timeNow time.Time
want int
wantErr bool
}{
{
"zero time max bytes to spend",
fields{
time.Time{},
0,
0,
},
refTime,
0,
false,
},
{
"same second, bytes still left to spend",
fields{
refTime,
MaxBytesPerSecond,
MaxBytesPerSecond - 1,
},
refTime,
0,
false,
},
{
"same second, not many but some bytes still left to spend",
fields{
refTime,
8,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"same second, 1 byte still left to spend",
fields{
refTime,
1,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"next second, bytes still left to spend",
fields{
refTime,
42,
1024,
},
refTime.Add(1 * time.Second),
0,
false,
},
{
"same second, max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond,
},
refTime,
1,
true,
},
{
"same second, more than max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond + 1,
},
refTime,
2,
true,
},
// Kinesis prevents reading more than 10 MiB at once
{
"same second, 10 MiB read all at once",
fields{
refTime,
0,
10 * 1024 * 1024,
},
refTime,
6,
true,
},
{
"same second, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime,
5,
true,
},
{
"5 seconds ago, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime.Add(5 * time.Second),
0,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &PollingShardConsumer{
lastCheckTime: tt.fields.lastCheckTime,
remBytes: tt.fields.remBytes,
bytesRead: tt.fields.bytesRead,
}
rateLimitTimeNow = func() time.Time {
return tt.timeNow
}
got, err := sc.checkCoolOffPeriod()
if (err != nil) != tt.wantErr {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want)
}
})
}
// restore original time.Now
rateLimitTimeNow = time.Now
}

View file

@ -22,7 +22,6 @@ package worker
import ( import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"

View file

@ -160,11 +160,15 @@ func (w *Worker) initialize() error {
log.Infof("Creating Kinesis client") log.Infof("Creating Kinesis client")
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{ if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 {
PartitionID: "aws", return aws.Endpoint{
URL: w.kclConfig.KinesisEndpoint, PartitionID: "aws",
SigningRegion: w.regionName, URL: w.kclConfig.KinesisEndpoint,
}, nil SigningRegion: w.regionName,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}) })
cfg, err := awsConfig.LoadDefaultConfig( cfg, err := awsConfig.LoadDefaultConfig(
@ -272,6 +276,14 @@ func (w *Worker) eventLoop() {
rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis))) rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis)))
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64()) shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64())
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
err := w.syncShard() err := w.syncShard()
if err != nil { if err != nil {
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep) log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
@ -363,14 +375,6 @@ func (w *Worker) eventLoop() {
log.Warnf("Error in rebalance: %+v", err) log.Warnf("Error in rebalance: %+v", err)
} }
} }
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
} }
} }

41
go.mod
View file

@ -1,18 +1,18 @@
module github.com/vmware/vmware-go-kcl-v2 module github.com/vmware/vmware-go-kcl-v2
go 1.17 go 1.21
require ( require (
github.com/aws/aws-sdk-go-v2 v1.11.2 github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.11.1 github.com/aws/aws-sdk-go-v2/config v1.27.5
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 github.com/aws/aws-sdk-go-v2/credentials v1.17.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.32.1 github.com/prometheus/common v0.32.1
github.com/rs/zerolog v1.26.1 github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
@ -23,17 +23,18 @@ require (
require ( require (
github.com/BurntSushi/toml v0.4.1 // indirect github.com/BurntSushi/toml v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/smithy-go v1.9.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
@ -45,7 +46,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/sys v0.1.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

81
go.sum
View file

@ -41,42 +41,44 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto= github.com/aws/aws-sdk-go-v2/config v1.27.5 h1:brBPsyRFQn97M1ZhQ9tLXkO7Zytiar0NS06FGmEJBdg=
github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME= github.com/aws/aws-sdk-go-v2/config v1.27.5/go.mod h1:I53uvsfddRRTG5YcC4n5Z3aOD1BU8hYCoIG7iEJG4wM=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s= github.com/aws/aws-sdk-go-v2/credentials v1.17.5 h1:yn3zSvIKC2NZIs40cY3kckcy9Zma96PrRR07N54PCvY=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU= github.com/aws/aws-sdk-go-v2/credentials v1.17.5/go.mod h1:8JcKPAGZVnDWuR5lusAwmrSDtZnDIAnpQWaDC9RFt2g=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 h1:mQySuI87thHtcbZvEDjwUROGWikU6fqgpHklCBXpJU4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1/go.mod h1:Z1ThUUTuCO9PArtiQsTmBGBv+38NGj+795Zl0n1jgiM=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 h1:n+nT52A+Ik+ut1D8IV4EP1qfyUdP9Jq60uYfnlJwSWc=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2/go.mod h1:BzzW6QegtSMnC1BhD+lagiUDSRYjRTOhXAb1mLfEaMg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 h1:/MpYoYvgshlGMFmSyfzGWf6HKoEo/DrKBoHxXR3vh+U=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 h1:x0N5ftQzgcfRpCpTiyZC40pvNUJYhzf4UgCsAyO6/P8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g= github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0= github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 h1:0YjXuWdYHvsm0HnT4vO8XpwG1D+i2roxSCBoN6deJ7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2/go.mod h1:jI+FWmYkSMn+4APWmZiZTgt0oM0TrvymD51FMqCnWgA=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@ -153,8 +155,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -211,8 +214,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -396,8 +400,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -453,7 +457,6 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=