Compare commits

..

No commits in common. "main" and "spentakota_catchNilErrorSyncLeases" have entirely different histories.

15 changed files with 115 additions and 704 deletions

View file

@ -79,9 +79,6 @@ type Checkpointer interface {
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error RemoveLeaseOwner(string) error
// GetLeaseOwner to get current owner of lease for shard
GetLeaseOwner(string) (string, error)
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

View file

@ -51,10 +51,6 @@ const (
NumMaxRetries = 10 NumMaxRetries = 10
) )
var (
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct { type DynamoCheckpoint struct {
log logger.Logger log logger.Logger
@ -133,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
// GetLease attempts to gain a lock on the given shard // GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error { func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC() newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano) newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
currentCheckpoint, err := checkpointer.getItem(shard.ID) currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil { if err != nil {
return err return err
@ -165,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout) currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
if err != nil { if err != nil {
return err return err
} }
@ -250,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
// CheckpointSequence writes a checkpoint at the designated sequence ID // CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano) leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339)
marshalledCheckpoint := map[string]types.AttributeValue{ marshalledCheckpoint := map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{ LeaseKeyKey: &types.AttributeValueMemberS{
Value: shard.ID, Value: shard.ID,
@ -294,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value) currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
if err != nil { if err != nil {
return err return err
} }
@ -340,23 +336,6 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
return err return err
} }
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
currentCheckpoint, err := checkpointer.getItem(shardID)
if err != nil {
return "", err
}
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
if !assignedToOk {
return "", NoLeaseOwnerErr
}
return assignedVar.(*types.AttributeValueMemberS).Value, nil
}
// ListActiveWorkers returns a map of workers and their shards // ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) { func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
err := checkpointer.syncLeases(shardStatus) err := checkpointer.syncLeases(shardStatus)
@ -391,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
if err != nil && err != ErrSequenceIDNotFound { if err != nil && err != ErrSequenceIDNotFound {
return err return err
} }
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano) leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339)
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)` conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
expressionAttributeValues := map[string]types.AttributeValue{ expressionAttributeValues := map[string]types.AttributeValue{
@ -483,6 +462,10 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
} }
} }
if err != nil {
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
return err
}
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond) log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
return nil return nil
} }

View file

@ -69,9 +69,6 @@ const (
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
DefaultLeaseRefreshPeriodMillis = 5000 DefaultLeaseRefreshPeriodMillis = 5000
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
DefaultLeaseRefreshWaitTime = 2500
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
DefaultMaxRecords = 10000 DefaultMaxRecords = 10000
@ -139,9 +136,6 @@ const (
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB) // DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
DefaultLeaseSyncingIntervalMillis = 60000 DefaultLeaseSyncingIntervalMillis = 60000
// DefaultMaxRetryCount The default maximum number of retries in case of error
DefaultMaxRetryCount = 5
) )
type ( type (
@ -219,9 +213,6 @@ type (
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int LeaseRefreshPeriodMillis int
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
LeaseRefreshWaitTime int
// MaxRecords Max records to read per Kinesis getRecords() call // MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int MaxRecords int
@ -292,9 +283,6 @@ type (
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB) // LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int LeaseSyncingTimeIntervalMillis int
// MaxRetryCount The maximum number of retries in case of error
MaxRetryCount int
} }
) )

View file

@ -102,8 +102,6 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
MaxRetryCount: DefaultMaxRetryCount,
Logger: logger.GetDefaultLogger(), Logger: logger.GetDefaultLogger(),
} }
} }
@ -150,12 +148,6 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
return c return c
} }
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.ShardSyncIntervalMillis = shardSyncIntervalMillis c.ShardSyncIntervalMillis = shardSyncIntervalMillis
@ -219,13 +211,6 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi
return c return c
} }
// WithMaxRetryCount sets the max retry count in case of error.
func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration {
checkIsValuePositive("maxRetryCount", maxRetryCount)
c.MaxRetryCount = maxRetryCount
return c
}
// WithMonitoringService sets the monitoring service to use to publish metrics. // WithMonitoringService sets the monitoring service to use to publish metrics.
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration {
// Nil case is handled downward (at worker creation) so no need to do it here. // Nil case is handled downward (at worker creation) so no need to do it here.

View file

@ -332,10 +332,6 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
m.processRecordsTime = append(m.processRecordsTime, time) m.processRecordsTime = append(m.processRecordsTime, time)
} }
func (cw *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
// not implemented
}
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics { func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{} var i interface{}
var ok bool var ok bool

View file

@ -35,7 +35,6 @@ type MonitoringService interface {
IncrRecordsProcessed(shard string, count int) IncrRecordsProcessed(shard string, count int)
IncrBytesProcessed(shard string, count int64) IncrBytesProcessed(shard string, count int64)
MillisBehindLatest(shard string, milliSeconds float64) MillisBehindLatest(shard string, milliSeconds float64)
DeleteMetricMillisBehindLatest(shard string)
LeaseGained(shard string) LeaseGained(shard string)
LeaseLost(shard string) LeaseLost(shard string)
LeaseRenewed(shard string) LeaseRenewed(shard string)
@ -54,7 +53,6 @@ func (NoopMonitoringService) Shutdown() {}
func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {} func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {}
func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {} func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {}
func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {} func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {}
func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {}
func (NoopMonitoringService) LeaseGained(_ string) {} func (NoopMonitoringService) LeaseGained(_ string) {}
func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseLost(_ string) {}
func (NoopMonitoringService) LeaseRenewed(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {}

View file

@ -147,10 +147,6 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64
p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds) p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds)
} }
func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName})
}
func (p *MonitoringService) LeaseGained(shard string) { func (p *MonitoringService) LeaseGained(shard string) {
p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc() p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc()
} }

View file

@ -21,7 +21,6 @@
package worker package worker
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -41,16 +40,10 @@ type shardConsumer interface {
getRecords() error getRecords() error
} }
type KinesisSubscriberGetter interface {
SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error)
GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
}
// commonShardConsumer implements common functionality for regular and enhanced fan-out consumers // commonShardConsumer implements common functionality for regular and enhanced fan-out consumers
type commonShardConsumer struct { type commonShardConsumer struct {
shard *par.ShardStatus shard *par.ShardStatus
kc KinesisSubscriberGetter kc *kinesis.Client
checkpointer chk.Checkpointer checkpointer chk.Checkpointer
recordProcessor kcl.IRecordProcessor recordProcessor kcl.IRecordProcessor
kclConfig *config.KinesisClientLibConfiguration kclConfig *config.KinesisClientLibConfiguration
@ -58,7 +51,7 @@ type commonShardConsumer struct {
} }
// Cleanup the internal lease cache // Cleanup the internal lease cache
func (sc *commonShardConsumer) releaseLease(shard string) { func (sc *commonShardConsumer) releaseLease() {
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
log.Infof("Release lease for shard %s", sc.shard.ID) log.Infof("Release lease for shard %s", sc.shard.ID)
sc.shard.SetLeaseOwner("") sc.shard.SetLeaseOwner("")
@ -66,11 +59,10 @@ func (sc *commonShardConsumer) releaseLease(shard string) {
// Release the lease by wiping out the lease owner for the shard // Release the lease by wiping out the lease owner for the shard
// Note: we don't need to do anything in case of error here and shard lease will eventually be expired. // Note: we don't need to do anything in case of error here and shard lease will eventually be expired.
if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil { if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil {
log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err) log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
} }
// reporting lease lose metrics // reporting lease lose metrics
sc.mService.DeleteMetricMillisBehindLatest(shard)
sc.mService.LeaseLost(sc.shard.ID) sc.mService.LeaseLost(sc.shard.ID)
} }
@ -173,6 +165,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
input.CacheEntryTime = &getRecordsStartTime input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input) sc.recordProcessor.ProcessRecords(input)
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
} }

View file

@ -46,7 +46,7 @@ type FanOutShardConsumer struct {
// getRecords subscribes to a shard and reads events from it. // getRecords subscribes to a shard and reads events from it.
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *FanOutShardConsumer) getRecords() error { func (sc *FanOutShardConsumer) getRecords() error {
defer sc.releaseLease(sc.shard.ID) defer sc.releaseLease()
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
@ -103,8 +103,6 @@ func (sc *FanOutShardConsumer) getRecords() error {
return err return err
} }
refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond))) refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)))
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case event, ok := <-shardSub.GetStream().Events(): case event, ok := <-shardSub.GetStream().Events():
if !ok { if !ok {
// need to resubscribe to shard // need to resubscribe to shard

View file

@ -32,7 +32,6 @@ package worker
import ( import (
"context" "context"
"errors" "errors"
log "github.com/sirupsen/logrus"
"math" "math"
"time" "time"
@ -45,33 +44,14 @@ import (
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
) )
const (
kinesisReadTPSLimit = 5
MaxBytes = 10000000
MaxBytesPerSecond = 2000000
BytesToMbConversion = 1000000
)
var (
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded")
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded")
)
// PollingShardConsumer is responsible for polling data records from a (specified) shard. // PollingShardConsumer is responsible for polling data records from a (specified) shard.
// Note: PollingShardConsumer only deal with one shard. // Note: PollingShardConsumer only deal with one shard.
type PollingShardConsumer struct { type PollingShardConsumer struct {
commonShardConsumer commonShardConsumer
streamName string streamName string
stop *chan struct{} stop *chan struct{}
consumerID string consumerID string
mService metrics.MonitoringService mService metrics.MonitoringService
currTime time.Time
callsLeft int
remBytes int
lastCheckTime time.Time
bytesRead int
} }
func (sc *PollingShardConsumer) getShardIterator() (*string, error) { func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
@ -99,12 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record // getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard. // Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error { func (sc *PollingShardConsumer) getRecords() error {
ctx, cancelFunc := context.WithCancel(context.Background()) defer sc.releaseLease()
defer func() {
// cancel renewLease()
cancelFunc()
sc.releaseLease(sc.shard.ID)
}()
log := sc.kclConfig.Logger log := sc.kclConfig.Logger
@ -133,70 +108,39 @@ func (sc *PollingShardConsumer) getRecords() error {
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
retriedErrors := 0 retriedErrors := 0
// define API call rate limit starting window
sc.currTime = rateLimitTimeNow()
sc.callsLeft = kinesisReadTPSLimit
sc.bytesRead = 0
sc.remBytes = MaxBytes
// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
for { for {
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
}
getRecordsStartTime := time.Now() getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
// Get records from stream and retry as needed
getRecordsArgs := &kinesis.GetRecordsInput{ getRecordsArgs := &kinesis.GetRecordsInput{
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
ShardIterator: shardIterator, ShardIterator: shardIterator,
} }
getResp, coolDownPeriod, err := sc.callGetRecordsAPI(getRecordsArgs)
// Get records from stream and retry as needed
getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs)
if err != nil { if err != nil {
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
var throughputExceededErr *types.ProvisionedThroughputExceededException var throughputExceededErr *types.ProvisionedThroughputExceededException
var kmsThrottlingErr *types.KMSThrottlingException var kmsThrottlingErr *types.KMSThrottlingException
if errors.As(err, &throughputExceededErr) { if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) {
retriedErrors++
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "Throughput Exceeded Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// If there is insufficient provisioned throughput on the stream,
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
sc.waitASecond(sc.currTime)
continue
}
if err == localTPSExceededError {
log.Infof("localTPSExceededError so sleep for a second")
sc.waitASecond(sc.currTime)
continue
}
if err == maxBytesExceededError {
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
continue
}
if errors.As(err, &kmsThrottlingErr) {
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
retriedErrors++ retriedErrors++
// Greater than MaxRetryCount so we get the last retry
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "KMS Throttling Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// exponential backoff // exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
@ -231,104 +175,7 @@ func (sc *PollingShardConsumer) getRecords() error {
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput) sc.recordProcessor.Shutdown(shutdownInput)
return nil return nil
case leaseRenewalErr := <-leaseRenewalErrChan:
return leaseRenewalErr
default: default:
} }
} }
} }
func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) {
waitTime := time.Since(timePassed)
if waitTime < time.Second {
time.Sleep(time.Second - waitTime)
}
}
func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
// Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords.
// If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
// check for overspending of byte budget from getRecords call
currentTime := rateLimitTimeNow()
secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds()
sc.lastCheckTime = currentTime
sc.remBytes += int(secondsPassed * MaxBytesPerSecond)
if sc.remBytes > MaxBytes {
sc.remBytes = MaxBytes
}
if sc.remBytes < 1 {
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
coolDown := sc.bytesRead / MaxBytesPerSecond
if sc.bytesRead%MaxBytesPerSecond > 0 {
coolDown++
}
return coolDown, maxBytesExceededError
} else {
sc.remBytes -= sc.bytesRead
}
return 0, nil
}
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, int, error) {
if sc.bytesRead != 0 {
coolDownPeriod, err := sc.checkCoolOffPeriod()
if err != nil {
return nil, coolDownPeriod, err
}
}
// every new second, we get a fresh set of calls
if rateLimitTimeSince(sc.currTime) > time.Second {
sc.callsLeft = kinesisReadTPSLimit
sc.currTime = rateLimitTimeNow()
}
if sc.callsLeft < 1 {
return nil, 0, localTPSExceededError
}
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
sc.callsLeft--
if err != nil {
return getResp, 0, err
}
// Calculate size of records from read transaction
sc.bytesRead = 0
for _, record := range getResp.Records {
sc.bytesRead += len(record.Data)
}
if sc.lastCheckTime.IsZero() {
sc.lastCheckTime = rateLimitTimeNow()
}
return getResp, 0, err
}
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
for {
timer := time.NewTimer(renewDuration)
select {
case <-timer.C:
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case <-ctx.Done():
// clean up timer resources
if !timer.Stop() {
<-timer.C
}
log.Debugf("renewLease was canceled")
return nil
}
}
}

View file

@ -1,355 +0,0 @@
/*
* Copyright (c) 2023 VMware, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
* associated documentation files (the "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package worker
import (
"context"
"errors"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
testGetRecordsError = errors.New("GetRecords Error")
)
func TestCallGetRecordsAPI(t *testing.T) {
// basic happy path
m1 := MockKinesisSubscriberGetter{}
ret := kinesis.GetRecordsOutput{}
m1.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret, nil)
psc := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m1},
}
gri := kinesis.GetRecordsInput{
ShardIterator: aws.String("shard-iterator-01"),
}
out, _, err := psc.callGetRecordsAPI(&gri)
assert.Nil(t, err)
assert.Equal(t, &ret, out)
m1.AssertExpectations(t)
// check that localTPSExceededError is thrown when trying more than 5 TPS
m2 := MockKinesisSubscriberGetter{}
psc2 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m2},
callsLeft: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 500 * time.Millisecond
}
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
assert.Nil(t, out2)
assert.ErrorIs(t, err2, localTPSExceededError)
m2.AssertExpectations(t)
// check that getRecords is called normally in bytesRead = 0 case
m3 := MockKinesisSubscriberGetter{}
ret3 := kinesis.GetRecordsOutput{}
m3.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret3, nil)
psc3 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m3},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out3, checkSleepVal, err3 := psc3.callGetRecordsAPI(&gri)
assert.Nil(t, err3)
assert.Equal(t, checkSleepVal, 0)
assert.Equal(t, &ret3, out3)
m3.AssertExpectations(t)
// check that correct cool off period is taken for 10mb in 1 second
testTime := time.Now()
m4 := MockKinesisSubscriberGetter{}
ret4 := kinesis.GetRecordsOutput{Records: nil}
m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil)
psc4 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m4},
callsLeft: 2,
bytesRead: MaxBytes,
lastCheckTime: testTime,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime.Add(time.Second)
}
out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri)
assert.Nil(t, err4)
assert.Equal(t, &ret4, out4)
m4.AssertExpectations(t)
if checkSleepVal2 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2)
}
// check that no cool off period is taken for 6mb in 3 seconds
testTime2 := time.Now()
m5 := MockKinesisSubscriberGetter{}
ret5 := kinesis.GetRecordsOutput{}
m5.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret5, nil)
psc5 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m5},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 3,
lastCheckTime: testTime2,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime2.Add(time.Second * 3)
}
out5, checkSleepVal3, err5 := psc5.callGetRecordsAPI(&gri)
assert.Nil(t, err5)
assert.Equal(t, checkSleepVal3, 0)
assert.Equal(t, &ret5, out5)
m5.AssertExpectations(t)
// check for correct cool off period with 8mb in .2 seconds with 6mb remaining
testTime3 := time.Now()
m6 := MockKinesisSubscriberGetter{}
ret6 := kinesis.GetRecordsOutput{Records: nil}
m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil)
psc6 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m6},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 4,
lastCheckTime: testTime3,
remBytes: MaxBytesPerSecond * 3,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime3.Add(time.Second / 5)
}
out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri)
assert.Nil(t, err6)
assert.Equal(t, &ret6, out6)
m5.AssertExpectations(t)
if checkSleepVal4 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4)
}
// case where getRecords throws error
m7 := MockKinesisSubscriberGetter{}
ret7 := kinesis.GetRecordsOutput{Records: nil}
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError)
psc7 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m7},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
assert.Equal(t, err7, testGetRecordsError)
assert.Equal(t, checkSleepVal7, 0)
assert.Equal(t, out7, &ret7)
m7.AssertExpectations(t)
// restore original func
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
}
type MockKinesisSubscriberGetter struct {
mock.Mock
}
func (m *MockKinesisSubscriberGetter) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) {
ret := m.Called(ctx, params, optFns)
return ret.Get(0).(*kinesis.GetRecordsOutput), ret.Error(1)
}
func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
return nil, nil
}
func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) {
return nil, nil
}
func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
refTime := time.Now()
type fields struct {
lastCheckTime time.Time
remBytes int
bytesRead int
}
tests := []struct {
name string
fields fields
timeNow time.Time
want int
wantErr bool
}{
{
"zero time max bytes to spend",
fields{
time.Time{},
0,
0,
},
refTime,
0,
false,
},
{
"same second, bytes still left to spend",
fields{
refTime,
MaxBytesPerSecond,
MaxBytesPerSecond - 1,
},
refTime,
0,
false,
},
{
"same second, not many but some bytes still left to spend",
fields{
refTime,
8,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"same second, 1 byte still left to spend",
fields{
refTime,
1,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"next second, bytes still left to spend",
fields{
refTime,
42,
1024,
},
refTime.Add(1 * time.Second),
0,
false,
},
{
"same second, max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond,
},
refTime,
1,
true,
},
{
"same second, more than max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond + 1,
},
refTime,
2,
true,
},
// Kinesis prevents reading more than 10 MiB at once
{
"same second, 10 MiB read all at once",
fields{
refTime,
0,
10 * 1024 * 1024,
},
refTime,
6,
true,
},
{
"same second, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime,
5,
true,
},
{
"5 seconds ago, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime.Add(5 * time.Second),
0,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &PollingShardConsumer{
lastCheckTime: tt.fields.lastCheckTime,
remBytes: tt.fields.remBytes,
bytesRead: tt.fields.bytesRead,
}
rateLimitTimeNow = func() time.Time {
return tt.timeNow
}
got, err := sc.checkCoolOffPeriod()
if (err != nil) != tt.wantErr {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want)
}
})
}
// restore original time.Now
rateLimitTimeNow = time.Now
}

View file

@ -22,6 +22,7 @@ package worker
import ( import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"

View file

@ -160,15 +160,11 @@ func (w *Worker) initialize() error {
log.Infof("Creating Kinesis client") log.Infof("Creating Kinesis client")
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 { return aws.Endpoint{
return aws.Endpoint{ PartitionID: "aws",
PartitionID: "aws", URL: w.kclConfig.KinesisEndpoint,
URL: w.kclConfig.KinesisEndpoint, SigningRegion: w.regionName,
SigningRegion: w.regionName, }, nil
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}) })
cfg, err := awsConfig.LoadDefaultConfig( cfg, err := awsConfig.LoadDefaultConfig(
@ -276,14 +272,6 @@ func (w *Worker) eventLoop() {
rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis))) rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis)))
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64()) shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64())
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
err := w.syncShard() err := w.syncShard()
if err != nil { if err != nil {
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep) log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
@ -375,6 +363,14 @@ func (w *Worker) eventLoop() {
log.Warnf("Error in rebalance: %+v", err) log.Warnf("Error in rebalance: %+v", err)
} }
} }
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
} }
} }

46
go.mod
View file

@ -1,40 +1,39 @@
module github.com/vmware/vmware-go-kcl-v2 module github.com/vmware/vmware-go-kcl-v2
go 1.21 go 1.17
require ( require (
github.com/aws/aws-sdk-go-v2 v1.25.2 github.com/aws/aws-sdk-go-v2 v1.11.2
github.com/aws/aws-sdk-go-v2/config v1.27.5 github.com/aws/aws-sdk-go-v2/config v1.11.1
github.com/aws/aws-sdk-go-v2/credentials v1.17.5 github.com/aws/aws-sdk-go-v2/credentials v1.6.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.11.1 github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1 github.com/prometheus/common v0.32.1
github.com/rs/zerolog v1.26.1 github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.1 github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.20.0 go.uber.org/zap v1.20.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0
) )
require ( require (
github.com/BurntSushi/toml v0.4.1 // indirect github.com/BurntSushi/toml v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect github.com/aws/smithy-go v1.9.0 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
@ -43,10 +42,9 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.1.0 // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
) )

92
go.sum
View file

@ -41,44 +41,42 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM=
github.com/aws/aws-sdk-go-v2/config v1.27.5 h1:brBPsyRFQn97M1ZhQ9tLXkO7Zytiar0NS06FGmEJBdg= github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto=
github.com/aws/aws-sdk-go-v2/config v1.27.5/go.mod h1:I53uvsfddRRTG5YcC4n5Z3aOD1BU8hYCoIG7iEJG4wM= github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME=
github.com/aws/aws-sdk-go-v2/credentials v1.17.5 h1:yn3zSvIKC2NZIs40cY3kckcy9Zma96PrRR07N54PCvY= github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s=
github.com/aws/aws-sdk-go-v2/credentials v1.17.5/go.mod h1:8JcKPAGZVnDWuR5lusAwmrSDtZnDIAnpQWaDC9RFt2g= github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 h1:mQySuI87thHtcbZvEDjwUROGWikU6fqgpHklCBXpJU4= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1/go.mod h1:Z1ThUUTuCO9PArtiQsTmBGBv+38NGj+795Zl0n1jgiM= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 h1:n+nT52A+Ik+ut1D8IV4EP1qfyUdP9Jq60uYfnlJwSWc= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2/go.mod h1:BzzW6QegtSMnC1BhD+lagiUDSRYjRTOhXAb1mLfEaMg= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 h1:/MpYoYvgshlGMFmSyfzGWf6HKoEo/DrKBoHxXR3vh+U= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 h1:x0N5ftQzgcfRpCpTiyZC40pvNUJYhzf4UgCsAyO6/P8= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I= github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY= github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ= github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8= github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw= github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y= github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 h1:0YjXuWdYHvsm0HnT4vO8XpwG1D+i2roxSCBoN6deJ7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2/go.mod h1:jI+FWmYkSMn+4APWmZiZTgt0oM0TrvymD51FMqCnWgA=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw= github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@ -155,9 +153,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -214,9 +211,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -244,17 +240,11 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -400,8 +390,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -457,6 +447,7 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@ -552,9 +543,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=