Compare commits

...

41 commits

Author SHA1 Message Date
spentakota
f6e79f1a2d
Merge pull request #53 from YakkovHow/main
fix: support go 1.21 & upgrade dependencies
2024-03-04 18:07:06 -08:00
Yakkov Leng
3676eb410a fix: support go 1.21 & upgrade dependencies
Signed-off-by: Yakkov Leng <yleng@vmware.com>
2024-03-04 17:56:27 -08:00
spentakota
b12921da23
Merge pull request #37 from vmware/spentakota_passContext
fix: pass in ctx with cancel for renewLease
2023-04-06 18:09:16 -07:00
Shiva Pentakota
4482696d95 fix: pass in ctx with cancel for renewLease
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-06 17:53:46 -07:00
vmwjc
16c5c53a30
Merge pull request #13 from calebstewart/fix/issue-5-empty-kinesisendpoint
Automatically resolve default KinesisEndpoint
2023-04-04 14:40:26 -07:00
vmwjc
f97ba4e3b2
Merge pull request #36 from vmware/clean-up-extra-err-check
chore: Remove extraneous err check
2023-04-04 14:21:24 -07:00
John Calixto
c1f6b270ab chore: Remove extraneous err check
After checking the scan result above this line, checking err here no
longer has any effect.

Signed-off-by: John Calixto <jcalixto@vmware.com>
2023-04-04 14:15:59 -07:00
vmwjc
6120c11333
Merge pull request #10 from mrmonaghan/fix-infinite-worker-loop
fixing infinite worker loop
2023-04-04 14:08:59 -07:00
spentakota
8ecb5b40a2
Merge pull request #35 from vmware/spentakota_retLeaseErr
fix: return err log in case of ErrLeaseNotAcquired
2023-04-04 11:21:14 -07:00
Shiva Pentakota
86d70940e6 fix: return err log in case of ErrLeaseNotAcquired
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-04 11:17:22 -07:00
spentakota
6516287f6d
Merge pull request #34 from vmware/spentakota_asyncLeaseRenewal
feat: make lease renewal async
2023-04-03 15:02:28 -07:00
Shiva Pentakota
4aebaf1ae0 feat: make lease renewal async
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-04-03 14:58:04 -07:00
spentakota
5be0422f33
Merge pull request #32 from vmware/spentakota_overdueShutdown
fix: add shutdown error case for checkpoint function
2023-03-30 12:51:49 -07:00
Shiva Pentakota
02d4b44ff6 fix: add shutdown and leaseExpired error cases for checkpoint function
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-03-28 11:46:55 -07:00
cmckelvey-vmware
44f1558847
Merge pull request #31 from vmware/dependabot/go_modules/golang.org/x/sys-0.1.0
Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0
2023-03-28 10:59:31 -06:00
cmckelvey-vmware
982c468bef
Merge pull request #30 from vmware/dependabot/go_modules/github.com/prometheus/client_golang-1.11.1
Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1
2023-03-28 10:58:45 -06:00
dependabot[bot]
09f0889f28
Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0.
- [Release notes](https://github.com/golang/sys/releases)
- [Commits](https://github.com/golang/sys/commits/v0.1.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-28 16:57:32 +00:00
dependabot[bot]
236412324f
Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.0 to 1.11.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.11.0...v1.11.1)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-03-28 16:56:55 +00:00
spentakota
61a027efef
Merge pull request #29 from vmware/fix-token-bucket-edge-cases
fix: Check token bucket corner cases correctly.
2023-03-23 12:46:42 -07:00
John Calixto
987fada9d3 fix: Check token bucket corner cases correctly.
Signed-off-by: John Calixto <jcalixto@vmware.com>
2023-03-23 11:23:35 -07:00
spentakota
711b72932a
Merge pull request #27 from vmware/spentakota_AddLogs
chore: add info logs in sleep case for kinesis backoff errors
2023-03-22 14:43:58 -07:00
Shiva Pentakota
a7c063b99c chore: add info logs in sleep case for kinesis backoff errors
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-03-22 12:22:26 -07:00
spentakota
bce4dd3f42
Merge pull request #24 from vmware/spentakota_fixNanoBug
fix: use nanosecond precision in lease comparisons
2023-02-14 10:30:14 -08:00
Shiva Pentakota
df16ef451c fix: use nanosecond precision in lease comparisons
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-02-13 17:56:11 -08:00
spentakota
fb17ec8bc6
Merge pull request #23 from vmware/spentakota_filNilError
fix: add check for GetRecords error within callGetRecordsAPI
2023-02-01 08:10:38 -08:00
Shiva Pentakota
04c5062ace fix: add check for GetRecords error within callGetRecordsAPI
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-02-01 08:00:49 -08:00
spentakota
c43491f922
Merge pull request #22 from vmware/spentakota/changeLogMessage
chore: log RemoveLeaseOwner errors with debug instead of error
2023-01-31 10:21:31 -08:00
Shiva Pentakota
f879712f9d chore: log RemoveLeaseOwner errors with debug instead of error
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-31 10:18:15 -08:00
spentakota
42881449ce
Merge pull request #21 from vmware/spentakota_callGetRecordsAPI
fix: Handle ProvisionedThroughputExceededException throttling
2023-01-24 17:12:39 -08:00
Shiva Pentakota
7d6b1c33d0 fix: add maxBytes per second getRecord check
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-24 16:28:22 -08:00
Shiva Pentakota
b5515931d1 fix: add hard cap maxRetries for getRecord errors
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-24 14:09:02 -08:00
Shiva Pentakota
66006caf89 fix: add getRecords TPS rate limiting
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-24 11:56:29 -08:00
spentakota
981dc2df11
Merge pull request #20 from vmware/jc/testable-getRecords
Refactor in prep for testing rate limiting improvements
2023-01-23 18:52:10 -08:00
John Calixto
3be57e8a74 Refactor in prep for testing rate limiting improvements
Signed-off-by: John Calixto <jcalixto@vmware.com>
2023-01-23 17:32:27 -08:00
spentakota
5e7aca6ab2
Merge pull request #18 from vmware/spentakota_deleteMetricMillis
fix: add DeleteMetricMillisBehindLatest for error case
2023-01-20 13:29:25 -08:00
Shiva Pentakota
599aa06ecd fix: add DeleteMetricMillisBehindLatest for error case
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-20 13:23:02 -08:00
spentakota
c5bc6c4ded
Merge pull request #17 from vmware/spentakota/sendLeaseRenewedMetric
feat: Sending renewed lease metric
2023-01-12 11:56:21 -08:00
Shiva Pentakota
e1425047a7 feat: Sending renewed lease metric
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
2023-01-12 11:31:16 -08:00
spentakota
4afa8fec3e
Merge pull request #16 from vmware/spentakota_catchNilErrorSyncLeases
fix: catch DynamoDB Scan error when trying to scan nonexistent table or index
2022-12-19 08:34:02 -08:00
Caleb Stewart
e2a45c53c3 Automatically resolve default KinesisEndpoint
This commit fixes #5 by returning `aws.EndpointNotFoundError` from the
endpoint resolver when no `KinesisEndpoint` is defined, which will
resolve the default AWS endpoint. This is the same process used by the
DynamoDB checkpointer to resolve the default endpoint.

Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com>
2022-10-13 13:37:51 -04:00
Mike Monaghan
aab08b9050 fixing infinite worker loop
Signed-off-by: Mike Monaghan <mike_monaghan@live.ca>
2022-09-13 15:31:16 -06:00
15 changed files with 704 additions and 115 deletions

View file

@ -79,6 +79,9 @@ type Checkpointer interface {
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error RemoveLeaseOwner(string) error
// GetLeaseOwner to get current owner of lease for shard
GetLeaseOwner(string) (string, error)
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

View file

@ -51,6 +51,10 @@ const (
NumMaxRetries = 10 NumMaxRetries = 10
) )
var (
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct { type DynamoCheckpoint struct {
log logger.Logger log logger.Logger
@ -129,7 +133,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
// GetLease attempts to gain a lock on the given shard // GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error { func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC() newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339) newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
currentCheckpoint, err := checkpointer.getItem(shard.ID) currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil { if err != nil {
return err return err
@ -161,7 +165,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout) currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
if err != nil { if err != nil {
return err return err
} }
@ -246,7 +250,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
// CheckpointSequence writes a checkpoint at the designated sequence ID // CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339) leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
marshalledCheckpoint := map[string]types.AttributeValue{ marshalledCheckpoint := map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{ LeaseKeyKey: &types.AttributeValueMemberS{
Value: shard.ID, Value: shard.ID,
@ -290,7 +294,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value) currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
if err != nil { if err != nil {
return err return err
} }
@ -336,6 +340,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
return err return err
} }
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
currentCheckpoint, err := checkpointer.getItem(shardID)
if err != nil {
return "", err
}
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
if !assignedToOk {
return "", NoLeaseOwnerErr
}
return assignedVar.(*types.AttributeValueMemberS).Value, nil
}
// ListActiveWorkers returns a map of workers and their shards // ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) { func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
err := checkpointer.syncLeases(shardStatus) err := checkpointer.syncLeases(shardStatus)
@ -370,7 +391,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
if err != nil && err != ErrSequenceIDNotFound { if err != nil && err != ErrSequenceIDNotFound {
return err return err
} }
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339) leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)` conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
expressionAttributeValues := map[string]types.AttributeValue{ expressionAttributeValues := map[string]types.AttributeValue{
@ -462,10 +483,6 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
} }
} }
if err != nil {
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
return err
}
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond) log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
return nil return nil
} }

View file

@ -69,6 +69,9 @@ const (
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
DefaultLeaseRefreshPeriodMillis = 5000 DefaultLeaseRefreshPeriodMillis = 5000
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
DefaultLeaseRefreshWaitTime = 2500
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
DefaultMaxRecords = 10000 DefaultMaxRecords = 10000
@ -136,6 +139,9 @@ const (
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB) // DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
DefaultLeaseSyncingIntervalMillis = 60000 DefaultLeaseSyncingIntervalMillis = 60000
// DefaultMaxRetryCount The default maximum number of retries in case of error
DefaultMaxRetryCount = 5
) )
type ( type (
@ -213,6 +219,9 @@ type (
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int LeaseRefreshPeriodMillis int
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
LeaseRefreshWaitTime int
// MaxRecords Max records to read per Kinesis getRecords() call // MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int MaxRecords int
@ -283,6 +292,9 @@ type (
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB) // LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int LeaseSyncingTimeIntervalMillis int
// MaxRetryCount The maximum number of retries in case of error
MaxRetryCount int
} }
) )

View file

@ -102,6 +102,8 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
MaxRetryCount: DefaultMaxRetryCount,
Logger: logger.GetDefaultLogger(), Logger: logger.GetDefaultLogger(),
} }
} }
@ -148,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
return c return c
} }
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.ShardSyncIntervalMillis = shardSyncIntervalMillis c.ShardSyncIntervalMillis = shardSyncIntervalMillis
@ -211,6 +219,13 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi
return c return c
} }
// WithMaxRetryCount sets the max retry count in case of error.
func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration {
checkIsValuePositive("maxRetryCount", maxRetryCount)
c.MaxRetryCount = maxRetryCount
return c
}
// WithMonitoringService sets the monitoring service to use to publish metrics. // WithMonitoringService sets the monitoring service to use to publish metrics.
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration {
// Nil case is handled downward (at worker creation) so no need to do it here. // Nil case is handled downward (at worker creation) so no need to do it here.

View file

@ -332,6 +332,10 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
m.processRecordsTime = append(m.processRecordsTime, time) m.processRecordsTime = append(m.processRecordsTime, time)
} }
func (cw *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
// not implemented
}
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{} var i interface{}
var ok bool var ok bool

View file

@ -35,6 +35,7 @@ type MonitoringService interface {
IncrRecordsProcessed(shard string, count int) IncrRecordsProcessed(shard string, count int)
IncrBytesProcessed(shard string, count int64) IncrBytesProcessed(shard string, count int64)
MillisBehindLatest(shard string, milliSeconds float64) MillisBehindLatest(shard string, milliSeconds float64)
DeleteMetricMillisBehindLatest(shard string)
LeaseGained(shard string) LeaseGained(shard string)
LeaseLost(shard string) LeaseLost(shard string)
LeaseRenewed(shard string) LeaseRenewed(shard string)
@ -53,6 +54,7 @@ func (NoopMonitoringService) Shutdown() {}
func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {} func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {}
func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {} func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {}
func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {} func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {}
func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {}
func (NoopMonitoringService) LeaseGained(_ string) {} func (NoopMonitoringService) LeaseGained(_ string) {}
func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseLost(_ string) {}
func (NoopMonitoringService) LeaseRenewed(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {}

View file

@ -147,6 +147,10 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64
p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds) p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds)
} }
func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName})
}
func (p *MonitoringService) LeaseGained(shard string) { func (p *MonitoringService) LeaseGained(shard string) {
p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc() p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc()
} }

View file

@ -21,6 +21,7 @@
package worker package worker
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -40,10 +41,16 @@ type shardConsumer interface {
getRecords() error getRecords() error
} }
type KinesisSubscriberGetter interface {
SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error)
GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
}
// commonShardConsumer implements common functionality for regular and enhanced fan-out consumers // commonShardConsumer implements common functionality for regular and enhanced fan-out consumers
type commonShardConsumer struct { type commonShardConsumer struct {
shard *par.ShardStatus shard *par.ShardStatus
kc *kinesis.Client kc KinesisSubscriberGetter
checkpointer chk.Checkpointer checkpointer chk.Checkpointer
recordProcessor kcl.IRecordProcessor recordProcessor kcl.IRecordProcessor
kclConfig *config.KinesisClientLibConfiguration kclConfig *config.KinesisClientLibConfiguration
@ -51,7 +58,7 @@ type commonShardConsumer struct {
} }
// Cleanup the internal lease cache // Cleanup the internal lease cache
func (sc *commonShardConsumer) releaseLease() { func (sc *commonShardConsumer) releaseLease(shard string) {
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
log.Infof("Release lease for shard %s", sc.shard.ID) log.Infof("Release lease for shard %s", sc.shard.ID)
sc.shard.SetLeaseOwner("") sc.shard.SetLeaseOwner("")
@ -59,10 +66,11 @@ func (sc *commonShardConsumer) releaseLease() {
// Release the lease by wiping out the lease owner for the shard // Release the lease by wiping out the lease owner for the shard
// Note: we don't need to do anything in case of error here and shard lease will eventually be expired. // Note: we don't need to do anything in case of error here and shard lease will eventually be expired.
if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil { if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil {
log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err) log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
} }
// reporting lease lose metrics // reporting lease lose metrics
sc.mService.DeleteMetricMillisBehindLatest(shard)
sc.mService.LeaseLost(sc.shard.ID) sc.mService.LeaseLost(sc.shard.ID)
} }
@ -165,7 +173,6 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
input.CacheEntryTime = &getRecordsStartTime input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input) sc.recordProcessor.ProcessRecords(input)
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
} }

View file

@ -46,7 +46,7 @@ type FanOutShardConsumer struct {
// getRecords subscribes to a shard and reads events from it. // getRecords subscribes to a shard and reads events from it.
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *FanOutShardConsumer) getRecords() error { func (sc *FanOutShardConsumer) getRecords() error {
defer sc.releaseLease() defer sc.releaseLease(sc.shard.ID)
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
@ -103,6 +103,8 @@ func (sc *FanOutShardConsumer) getRecords() error {
return err return err
} }
refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond))) refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)))
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case event, ok := <-shardSub.GetStream().Events(): case event, ok := <-shardSub.GetStream().Events():
if !ok { if !ok {
// need to resubscribe to shard // need to resubscribe to shard

View file

@ -32,6 +32,7 @@ package worker
import ( import (
"context" "context"
"errors" "errors"
log "github.com/sirupsen/logrus"
"math" "math"
"time" "time"
@ -44,14 +45,33 @@ import (
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
) )
const (
kinesisReadTPSLimit = 5
MaxBytes = 10000000
MaxBytesPerSecond = 2000000
BytesToMbConversion = 1000000
)
var (
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded")
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded")
)
// PollingShardConsumer is responsible for polling data records from a (specified) shard. // PollingShardConsumer is responsible for polling data records from a (specified) shard.
// Note: PollingShardConsumer only deal with one shard. // Note: PollingShardConsumer only deal with one shard.
type PollingShardConsumer struct { type PollingShardConsumer struct {
commonShardConsumer commonShardConsumer
streamName string streamName string
stop *chan struct{} stop *chan struct{}
consumerID string consumerID string
mService metrics.MonitoringService mService metrics.MonitoringService
currTime time.Time
callsLeft int
remBytes int
lastCheckTime time.Time
bytesRead int
} }
func (sc *PollingShardConsumer) getShardIterator() (*string, error) { func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
@ -79,7 +99,12 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record // getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error { func (sc *PollingShardConsumer) getRecords() error {
defer sc.releaseLease() ctx, cancelFunc := context.WithCancel(context.Background())
defer func() {
// cancel renewLease()
cancelFunc()
sc.releaseLease(sc.shard.ID)
}()
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
@ -108,39 +133,70 @@ func (sc *PollingShardConsumer) getRecords() error {
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
retriedErrors := 0 retriedErrors := 0
for { // define API call rate limit starting window
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { sc.currTime = rateLimitTimeNow()
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) sc.callsLeft = kinesisReadTPSLimit
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID) sc.bytesRead = 0
if err != nil { sc.remBytes = MaxBytes
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
}
// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
for {
getRecordsStartTime := time.Now() getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
// Get records from stream and retry as needed
getRecordsArgs := &kinesis.GetRecordsInput{ getRecordsArgs := &kinesis.GetRecordsInput{
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
ShardIterator: shardIterator, ShardIterator: shardIterator,
} }
getResp, coolDownPeriod, err := sc.callGetRecordsAPI(getRecordsArgs)
// Get records from stream and retry as needed
getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs)
if err != nil { if err != nil {
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
var throughputExceededErr *types.ProvisionedThroughputExceededException var throughputExceededErr *types.ProvisionedThroughputExceededException
var kmsThrottlingErr *types.KMSThrottlingException var kmsThrottlingErr *types.KMSThrottlingException
if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) { if errors.As(err, &throughputExceededErr) {
retriedErrors++
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "Throughput Exceeded Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// If there is insufficient provisioned throughput on the stream,
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
sc.waitASecond(sc.currTime)
continue
}
if err == localTPSExceededError {
log.Infof("localTPSExceededError so sleep for a second")
sc.waitASecond(sc.currTime)
continue
}
if err == maxBytesExceededError {
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
continue
}
if errors.As(err, &kmsThrottlingErr) {
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
retriedErrors++ retriedErrors++
// Greater than MaxRetryCount so we get the last retry
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "KMS Throttling Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// exponential backoff // exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
@ -175,7 +231,104 @@ func (sc *PollingShardConsumer) getRecords() error {
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput) sc.recordProcessor.Shutdown(shutdownInput)
return nil return nil
case leaseRenewalErr := <-leaseRenewalErrChan:
return leaseRenewalErr
default: default:
} }
} }
} }
func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) {
waitTime := time.Since(timePassed)
if waitTime < time.Second {
time.Sleep(time.Second - waitTime)
}
}
func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
// Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords.
// If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
// check for overspending of byte budget from getRecords call
currentTime := rateLimitTimeNow()
secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds()
sc.lastCheckTime = currentTime
sc.remBytes += int(secondsPassed * MaxBytesPerSecond)
if sc.remBytes > MaxBytes {
sc.remBytes = MaxBytes
}
if sc.remBytes < 1 {
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
coolDown := sc.bytesRead / MaxBytesPerSecond
if sc.bytesRead%MaxBytesPerSecond > 0 {
coolDown++
}
return coolDown, maxBytesExceededError
} else {
sc.remBytes -= sc.bytesRead
}
return 0, nil
}
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, int, error) {
if sc.bytesRead != 0 {
coolDownPeriod, err := sc.checkCoolOffPeriod()
if err != nil {
return nil, coolDownPeriod, err
}
}
// every new second, we get a fresh set of calls
if rateLimitTimeSince(sc.currTime) > time.Second {
sc.callsLeft = kinesisReadTPSLimit
sc.currTime = rateLimitTimeNow()
}
if sc.callsLeft < 1 {
return nil, 0, localTPSExceededError
}
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
sc.callsLeft--
if err != nil {
return getResp, 0, err
}
// Calculate size of records from read transaction
sc.bytesRead = 0
for _, record := range getResp.Records {
sc.bytesRead += len(record.Data)
}
if sc.lastCheckTime.IsZero() {
sc.lastCheckTime = rateLimitTimeNow()
}
return getResp, 0, err
}
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
for {
timer := time.NewTimer(renewDuration)
select {
case <-timer.C:
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case <-ctx.Done():
// clean up timer resources
if !timer.Stop() {
<-timer.C
}
log.Debugf("renewLease was canceled")
return nil
}
}
}

View file

@ -0,0 +1,355 @@
/*
* Copyright (c) 2023 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package worker
import (
"context"
"errors"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
testGetRecordsError = errors.New("GetRecords Error")
)
func TestCallGetRecordsAPI(t *testing.T) {
// basic happy path
m1 := MockKinesisSubscriberGetter{}
ret := kinesis.GetRecordsOutput{}
m1.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret, nil)
psc := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m1},
}
gri := kinesis.GetRecordsInput{
ShardIterator: aws.String("shard-iterator-01"),
}
out, _, err := psc.callGetRecordsAPI(&gri)
assert.Nil(t, err)
assert.Equal(t, &ret, out)
m1.AssertExpectations(t)
// check that localTPSExceededError is thrown when trying more than 5 TPS
m2 := MockKinesisSubscriberGetter{}
psc2 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m2},
callsLeft: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 500 * time.Millisecond
}
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
assert.Nil(t, out2)
assert.ErrorIs(t, err2, localTPSExceededError)
m2.AssertExpectations(t)
// check that getRecords is called normally in bytesRead = 0 case
m3 := MockKinesisSubscriberGetter{}
ret3 := kinesis.GetRecordsOutput{}
m3.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret3, nil)
psc3 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m3},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out3, checkSleepVal, err3 := psc3.callGetRecordsAPI(&gri)
assert.Nil(t, err3)
assert.Equal(t, checkSleepVal, 0)
assert.Equal(t, &ret3, out3)
m3.AssertExpectations(t)
// check that correct cool off period is taken for 10mb in 1 second
testTime := time.Now()
m4 := MockKinesisSubscriberGetter{}
ret4 := kinesis.GetRecordsOutput{Records: nil}
m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil)
psc4 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m4},
callsLeft: 2,
bytesRead: MaxBytes,
lastCheckTime: testTime,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime.Add(time.Second)
}
out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri)
assert.Nil(t, err4)
assert.Equal(t, &ret4, out4)
m4.AssertExpectations(t)
if checkSleepVal2 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2)
}
// check that no cool off period is taken for 6mb in 3 seconds
testTime2 := time.Now()
m5 := MockKinesisSubscriberGetter{}
ret5 := kinesis.GetRecordsOutput{}
m5.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret5, nil)
psc5 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m5},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 3,
lastCheckTime: testTime2,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime2.Add(time.Second * 3)
}
out5, checkSleepVal3, err5 := psc5.callGetRecordsAPI(&gri)
assert.Nil(t, err5)
assert.Equal(t, checkSleepVal3, 0)
assert.Equal(t, &ret5, out5)
m5.AssertExpectations(t)
// check for correct cool off period with 8mb in .2 seconds with 6mb remaining
testTime3 := time.Now()
m6 := MockKinesisSubscriberGetter{}
ret6 := kinesis.GetRecordsOutput{Records: nil}
m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil)
psc6 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m6},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 4,
lastCheckTime: testTime3,
remBytes: MaxBytesPerSecond * 3,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime3.Add(time.Second / 5)
}
out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri)
assert.Nil(t, err6)
assert.Equal(t, &ret6, out6)
m5.AssertExpectations(t)
if checkSleepVal4 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4)
}
// case where getRecords throws error
m7 := MockKinesisSubscriberGetter{}
ret7 := kinesis.GetRecordsOutput{Records: nil}
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError)
psc7 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m7},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
assert.Equal(t, err7, testGetRecordsError)
assert.Equal(t, checkSleepVal7, 0)
assert.Equal(t, out7, &ret7)
m7.AssertExpectations(t)
// restore original func
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
}
type MockKinesisSubscriberGetter struct {
mock.Mock
}
func (m *MockKinesisSubscriberGetter) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) {
ret := m.Called(ctx, params, optFns)
return ret.Get(0).(*kinesis.GetRecordsOutput), ret.Error(1)
}
func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
return nil, nil
}
func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) {
return nil, nil
}
func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
refTime := time.Now()
type fields struct {
lastCheckTime time.Time
remBytes int
bytesRead int
}
tests := []struct {
name string
fields fields
timeNow time.Time
want int
wantErr bool
}{
{
"zero time max bytes to spend",
fields{
time.Time{},
0,
0,
},
refTime,
0,
false,
},
{
"same second, bytes still left to spend",
fields{
refTime,
MaxBytesPerSecond,
MaxBytesPerSecond - 1,
},
refTime,
0,
false,
},
{
"same second, not many but some bytes still left to spend",
fields{
refTime,
8,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"same second, 1 byte still left to spend",
fields{
refTime,
1,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"next second, bytes still left to spend",
fields{
refTime,
42,
1024,
},
refTime.Add(1 * time.Second),
0,
false,
},
{
"same second, max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond,
},
refTime,
1,
true,
},
{
"same second, more than max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond + 1,
},
refTime,
2,
true,
},
// Kinesis prevents reading more than 10 MiB at once
{
"same second, 10 MiB read all at once",
fields{
refTime,
0,
10 * 1024 * 1024,
},
refTime,
6,
true,
},
{
"same second, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime,
5,
true,
},
{
"5 seconds ago, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime.Add(5 * time.Second),
0,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &PollingShardConsumer{
lastCheckTime: tt.fields.lastCheckTime,
remBytes: tt.fields.remBytes,
bytesRead: tt.fields.bytesRead,
}
rateLimitTimeNow = func() time.Time {
return tt.timeNow
}
got, err := sc.checkCoolOffPeriod()
if (err != nil) != tt.wantErr {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want)
}
})
}
// restore original time.Now
rateLimitTimeNow = time.Now
}

View file

@ -22,7 +22,6 @@ package worker
import ( import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"

View file

@ -160,11 +160,15 @@ func (w *Worker) initialize() error {
log.Infof("Creating Kinesis client") log.Infof("Creating Kinesis client")
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{ if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 {
PartitionID: "aws", return aws.Endpoint{
URL: w.kclConfig.KinesisEndpoint, PartitionID: "aws",
SigningRegion: w.regionName, URL: w.kclConfig.KinesisEndpoint,
}, nil SigningRegion: w.regionName,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}) })
cfg, err := awsConfig.LoadDefaultConfig( cfg, err := awsConfig.LoadDefaultConfig(
@ -272,6 +276,14 @@ func (w *Worker) eventLoop() {
rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis))) rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis)))
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64()) shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64())
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
err := w.syncShard() err := w.syncShard()
if err != nil { if err != nil {
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep) log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
@ -363,14 +375,6 @@ func (w *Worker) eventLoop() {
log.Warnf("Error in rebalance: %+v", err) log.Warnf("Error in rebalance: %+v", err)
} }
} }
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
} }
} }

46
go.mod
View file

@ -1,39 +1,40 @@
module github.com/vmware/vmware-go-kcl-v2 module github.com/vmware/vmware-go-kcl-v2
go 1.17 go 1.21
require ( require (
github.com/aws/aws-sdk-go-v2 v1.11.2 github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.11.1 github.com/aws/aws-sdk-go-v2/config v1.27.5
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 github.com/aws/aws-sdk-go-v2/credentials v1.17.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.32.1 github.com/prometheus/common v0.32.1
github.com/rs/zerolog v1.26.1 github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.8.1
go.uber.org/zap v1.20.0 go.uber.org/zap v1.20.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0
) )
require ( require (
github.com/BurntSushi/toml v0.4.1 // indirect github.com/BurntSushi/toml v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/smithy-go v1.9.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
@ -42,9 +43,10 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/sys v0.1.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

92
go.sum
View file

@ -41,42 +41,44 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto= github.com/aws/aws-sdk-go-v2/config v1.27.5 h1:brBPsyRFQn97M1ZhQ9tLXkO7Zytiar0NS06FGmEJBdg=
github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME= github.com/aws/aws-sdk-go-v2/config v1.27.5/go.mod h1:I53uvsfddRRTG5YcC4n5Z3aOD1BU8hYCoIG7iEJG4wM=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s= github.com/aws/aws-sdk-go-v2/credentials v1.17.5 h1:yn3zSvIKC2NZIs40cY3kckcy9Zma96PrRR07N54PCvY=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU= github.com/aws/aws-sdk-go-v2/credentials v1.17.5/go.mod h1:8JcKPAGZVnDWuR5lusAwmrSDtZnDIAnpQWaDC9RFt2g=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 h1:mQySuI87thHtcbZvEDjwUROGWikU6fqgpHklCBXpJU4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1/go.mod h1:Z1ThUUTuCO9PArtiQsTmBGBv+38NGj+795Zl0n1jgiM=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 h1:n+nT52A+Ik+ut1D8IV4EP1qfyUdP9Jq60uYfnlJwSWc=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2/go.mod h1:BzzW6QegtSMnC1BhD+lagiUDSRYjRTOhXAb1mLfEaMg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 h1:/MpYoYvgshlGMFmSyfzGWf6HKoEo/DrKBoHxXR3vh+U=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 h1:x0N5ftQzgcfRpCpTiyZC40pvNUJYhzf4UgCsAyO6/P8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g= github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0= github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 h1:0YjXuWdYHvsm0HnT4vO8XpwG1D+i2roxSCBoN6deJ7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2/go.mod h1:jI+FWmYkSMn+4APWmZiZTgt0oM0TrvymD51FMqCnWgA=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@ -153,8 +155,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -211,8 +214,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -240,11 +244,17 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -390,8 +400,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -447,7 +457,6 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@ -543,8 +552,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=