Compare commits
No commits in common. "main" and "jc/testable-getRecords" have entirely different histories.
main
...
jc/testabl
12 changed files with 111 additions and 604 deletions
|
|
@ -79,9 +79,6 @@ type Checkpointer interface {
|
|||
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
|
||||
RemoveLeaseOwner(string) error
|
||||
|
||||
// GetLeaseOwner to get current owner of lease for shard
|
||||
GetLeaseOwner(string) (string, error)
|
||||
|
||||
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
|
||||
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)
|
||||
|
||||
|
|
|
|||
|
|
@ -51,10 +51,6 @@ const (
|
|||
NumMaxRetries = 10
|
||||
)
|
||||
|
||||
var (
|
||||
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
|
||||
)
|
||||
|
||||
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
|
||||
type DynamoCheckpoint struct {
|
||||
log logger.Logger
|
||||
|
|
@ -133,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
|
|||
// GetLease attempts to gain a lock on the given shard
|
||||
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
|
||||
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
|
||||
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
|
||||
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
|
||||
currentCheckpoint, err := checkpointer.getItem(shard.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -165,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
|
|||
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
|
||||
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
|
||||
|
||||
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
|
||||
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -250,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
|
|||
|
||||
// CheckpointSequence writes a checkpoint at the designated sequence ID
|
||||
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
|
||||
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
|
||||
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339)
|
||||
marshalledCheckpoint := map[string]types.AttributeValue{
|
||||
LeaseKeyKey: &types.AttributeValueMemberS{
|
||||
Value: shard.ID,
|
||||
|
|
@ -294,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
|
|||
|
||||
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
|
||||
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
|
||||
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
|
||||
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -340,23 +336,6 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
|
||||
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
|
||||
currentCheckpoint, err := checkpointer.getItem(shardID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
|
||||
|
||||
if !assignedToOk {
|
||||
return "", NoLeaseOwnerErr
|
||||
}
|
||||
|
||||
return assignedVar.(*types.AttributeValueMemberS).Value, nil
|
||||
|
||||
}
|
||||
|
||||
// ListActiveWorkers returns a map of workers and their shards
|
||||
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
|
||||
err := checkpointer.syncLeases(shardStatus)
|
||||
|
|
@ -391,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
|
|||
if err != nil && err != ErrSequenceIDNotFound {
|
||||
return err
|
||||
}
|
||||
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
|
||||
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339)
|
||||
|
||||
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
|
||||
expressionAttributeValues := map[string]types.AttributeValue{
|
||||
|
|
@ -483,6 +462,10 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
|
|||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
|
||||
return err
|
||||
}
|
||||
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,9 +69,6 @@ const (
|
|||
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
|
||||
DefaultLeaseRefreshPeriodMillis = 5000
|
||||
|
||||
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
|
||||
DefaultLeaseRefreshWaitTime = 2500
|
||||
|
||||
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
|
||||
DefaultMaxRecords = 10000
|
||||
|
||||
|
|
@ -139,9 +136,6 @@ const (
|
|||
|
||||
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
|
||||
DefaultLeaseSyncingIntervalMillis = 60000
|
||||
|
||||
// DefaultMaxRetryCount The default maximum number of retries in case of error
|
||||
DefaultMaxRetryCount = 5
|
||||
)
|
||||
|
||||
type (
|
||||
|
|
@ -219,9 +213,6 @@ type (
|
|||
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
|
||||
LeaseRefreshPeriodMillis int
|
||||
|
||||
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
|
||||
LeaseRefreshWaitTime int
|
||||
|
||||
// MaxRecords Max records to read per Kinesis getRecords() call
|
||||
MaxRecords int
|
||||
|
||||
|
|
@ -292,9 +283,6 @@ type (
|
|||
|
||||
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
|
||||
LeaseSyncingTimeIntervalMillis int
|
||||
|
||||
// MaxRetryCount The maximum number of retries in case of error
|
||||
MaxRetryCount int
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -102,8 +102,6 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
|
|||
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
|
||||
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
|
||||
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
|
||||
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
|
||||
MaxRetryCount: DefaultMaxRetryCount,
|
||||
Logger: logger.GetDefaultLogger(),
|
||||
}
|
||||
}
|
||||
|
|
@ -150,12 +148,6 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
|
||||
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
|
||||
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
|
||||
|
|
@ -219,13 +211,6 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi
|
|||
return c
|
||||
}
|
||||
|
||||
// WithMaxRetryCount sets the max retry count in case of error.
|
||||
func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration {
|
||||
checkIsValuePositive("maxRetryCount", maxRetryCount)
|
||||
c.MaxRetryCount = maxRetryCount
|
||||
return c
|
||||
}
|
||||
|
||||
// WithMonitoringService sets the monitoring service to use to publish metrics.
|
||||
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration {
|
||||
// Nil case is handled downward (at worker creation) so no need to do it here.
|
||||
|
|
|
|||
|
|
@ -332,10 +332,6 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
|
|||
m.processRecordsTime = append(m.processRecordsTime, time)
|
||||
}
|
||||
|
||||
func (cw *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
|
||||
// not implemented
|
||||
}
|
||||
|
||||
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
|
||||
var i interface{}
|
||||
var ok bool
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ func (sc *commonShardConsumer) releaseLease(shard string) {
|
|||
// Release the lease by wiping out the lease owner for the shard
|
||||
// Note: we don't need to do anything in case of error here and shard lease will eventually be expired.
|
||||
if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil {
|
||||
log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
|
||||
log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
|
||||
}
|
||||
|
||||
// reporting lease lose metrics
|
||||
|
|
@ -173,6 +173,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
|
|||
input.CacheEntryTime = &getRecordsStartTime
|
||||
input.CacheExitTime = &processRecordsStartTime
|
||||
sc.recordProcessor.ProcessRecords(input)
|
||||
|
||||
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
|
||||
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ package worker
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
|
|
@ -45,33 +44,14 @@ import (
|
|||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
kinesisReadTPSLimit = 5
|
||||
MaxBytes = 10000000
|
||||
MaxBytesPerSecond = 2000000
|
||||
BytesToMbConversion = 1000000
|
||||
)
|
||||
|
||||
var (
|
||||
rateLimitTimeNow = time.Now
|
||||
rateLimitTimeSince = time.Since
|
||||
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded")
|
||||
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded")
|
||||
)
|
||||
|
||||
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
|
||||
// Note: PollingShardConsumer only deal with one shard.
|
||||
type PollingShardConsumer struct {
|
||||
commonShardConsumer
|
||||
streamName string
|
||||
stop *chan struct{}
|
||||
consumerID string
|
||||
mService metrics.MonitoringService
|
||||
currTime time.Time
|
||||
callsLeft int
|
||||
remBytes int
|
||||
lastCheckTime time.Time
|
||||
bytesRead int
|
||||
streamName string
|
||||
stop *chan struct{}
|
||||
consumerID string
|
||||
mService metrics.MonitoringService
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
|
||||
|
|
@ -99,12 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
|
|||
// getRecords continuously poll one shard for data record
|
||||
// Precondition: it currently has the lease on the shard.
|
||||
func (sc *PollingShardConsumer) getRecords() error {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer func() {
|
||||
// cancel renewLease()
|
||||
cancelFunc()
|
||||
sc.releaseLease(sc.shard.ID)
|
||||
}()
|
||||
defer sc.releaseLease(sc.shard.ID)
|
||||
|
||||
log := sc.kclConfig.Logger
|
||||
|
||||
|
|
@ -133,18 +108,24 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
|
||||
retriedErrors := 0
|
||||
|
||||
// define API call rate limit starting window
|
||||
sc.currTime = rateLimitTimeNow()
|
||||
sc.callsLeft = kinesisReadTPSLimit
|
||||
sc.bytesRead = 0
|
||||
sc.remBytes = MaxBytes
|
||||
|
||||
// starting async lease renewal thread
|
||||
leaseRenewalErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
leaseRenewalErrChan <- sc.renewLease(ctx)
|
||||
}()
|
||||
for {
|
||||
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
|
||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
|
||||
if err != nil {
|
||||
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
|
||||
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
return nil
|
||||
}
|
||||
// log and return error
|
||||
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
|
||||
sc.shard.ID, sc.consumerID, err)
|
||||
return err
|
||||
}
|
||||
// log metric for renewed lease for worker
|
||||
sc.mService.LeaseRenewed(sc.shard.ID)
|
||||
}
|
||||
|
||||
getRecordsStartTime := time.Now()
|
||||
|
||||
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
|
||||
|
|
@ -154,49 +135,14 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
|
||||
ShardIterator: shardIterator,
|
||||
}
|
||||
getResp, coolDownPeriod, err := sc.callGetRecordsAPI(getRecordsArgs)
|
||||
getResp, err := sc.callGetRecordsAPI(getRecordsArgs)
|
||||
if err != nil {
|
||||
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
|
||||
var throughputExceededErr *types.ProvisionedThroughputExceededException
|
||||
var kmsThrottlingErr *types.KMSThrottlingException
|
||||
if errors.As(err, &throughputExceededErr) {
|
||||
retriedErrors++
|
||||
if retriedErrors > sc.kclConfig.MaxRetryCount {
|
||||
log.Errorf("message", "Throughput Exceeded Error: "+
|
||||
"reached max retry count getting records from shard",
|
||||
"shardId", sc.shard.ID,
|
||||
"retryCount", retriedErrors,
|
||||
"error", err)
|
||||
return err
|
||||
}
|
||||
// If there is insufficient provisioned throughput on the stream,
|
||||
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
|
||||
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
||||
sc.waitASecond(sc.currTime)
|
||||
continue
|
||||
}
|
||||
if err == localTPSExceededError {
|
||||
log.Infof("localTPSExceededError so sleep for a second")
|
||||
sc.waitASecond(sc.currTime)
|
||||
continue
|
||||
}
|
||||
if err == maxBytesExceededError {
|
||||
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
|
||||
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
|
||||
continue
|
||||
}
|
||||
if errors.As(err, &kmsThrottlingErr) {
|
||||
if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) {
|
||||
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
|
||||
retriedErrors++
|
||||
// Greater than MaxRetryCount so we get the last retry
|
||||
if retriedErrors > sc.kclConfig.MaxRetryCount {
|
||||
log.Errorf("message", "KMS Throttling Error: "+
|
||||
"reached max retry count getting records from shard",
|
||||
"shardId", sc.shard.ID,
|
||||
"retryCount", retriedErrors,
|
||||
"error", err)
|
||||
return err
|
||||
}
|
||||
// exponential backoff
|
||||
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
|
||||
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
|
||||
|
|
@ -231,104 +177,12 @@ func (sc *PollingShardConsumer) getRecords() error {
|
|||
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
|
||||
sc.recordProcessor.Shutdown(shutdownInput)
|
||||
return nil
|
||||
case leaseRenewalErr := <-leaseRenewalErrChan:
|
||||
return leaseRenewalErr
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) {
|
||||
waitTime := time.Since(timePassed)
|
||||
if waitTime < time.Second {
|
||||
time.Sleep(time.Second - waitTime)
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
|
||||
// Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords.
|
||||
// If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
|
||||
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
|
||||
// check for overspending of byte budget from getRecords call
|
||||
currentTime := rateLimitTimeNow()
|
||||
secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds()
|
||||
sc.lastCheckTime = currentTime
|
||||
sc.remBytes += int(secondsPassed * MaxBytesPerSecond)
|
||||
|
||||
if sc.remBytes > MaxBytes {
|
||||
sc.remBytes = MaxBytes
|
||||
}
|
||||
if sc.remBytes < 1 {
|
||||
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
|
||||
coolDown := sc.bytesRead / MaxBytesPerSecond
|
||||
if sc.bytesRead%MaxBytesPerSecond > 0 {
|
||||
coolDown++
|
||||
}
|
||||
return coolDown, maxBytesExceededError
|
||||
} else {
|
||||
sc.remBytes -= sc.bytesRead
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, int, error) {
|
||||
if sc.bytesRead != 0 {
|
||||
coolDownPeriod, err := sc.checkCoolOffPeriod()
|
||||
if err != nil {
|
||||
return nil, coolDownPeriod, err
|
||||
}
|
||||
}
|
||||
// every new second, we get a fresh set of calls
|
||||
if rateLimitTimeSince(sc.currTime) > time.Second {
|
||||
sc.callsLeft = kinesisReadTPSLimit
|
||||
sc.currTime = rateLimitTimeNow()
|
||||
}
|
||||
|
||||
if sc.callsLeft < 1 {
|
||||
return nil, 0, localTPSExceededError
|
||||
}
|
||||
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
|
||||
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
|
||||
sc.callsLeft--
|
||||
|
||||
if err != nil {
|
||||
return getResp, 0, err
|
||||
}
|
||||
|
||||
// Calculate size of records from read transaction
|
||||
sc.bytesRead = 0
|
||||
for _, record := range getResp.Records {
|
||||
sc.bytesRead += len(record.Data)
|
||||
}
|
||||
if sc.lastCheckTime.IsZero() {
|
||||
sc.lastCheckTime = rateLimitTimeNow()
|
||||
}
|
||||
|
||||
return getResp, 0, err
|
||||
}
|
||||
|
||||
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
|
||||
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
|
||||
for {
|
||||
timer := time.NewTimer(renewDuration)
|
||||
select {
|
||||
case <-timer.C:
|
||||
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
|
||||
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
|
||||
if err != nil {
|
||||
// log and return error
|
||||
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
|
||||
sc.shard.ID, sc.consumerID, err)
|
||||
return err
|
||||
}
|
||||
// log metric for renewed lease for worker
|
||||
sc.mService.LeaseRenewed(sc.shard.ID)
|
||||
case <-ctx.Done():
|
||||
// clean up timer resources
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
log.Debugf("renewLease was canceled")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return getResp, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,9 +21,7 @@ package worker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
|
|
@ -31,10 +29,6 @@ import (
|
|||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
var (
|
||||
testGetRecordsError = errors.New("GetRecords Error")
|
||||
)
|
||||
|
||||
func TestCallGetRecordsAPI(t *testing.T) {
|
||||
// basic happy path
|
||||
m1 := MockKinesisSubscriberGetter{}
|
||||
|
|
@ -46,141 +40,10 @@ func TestCallGetRecordsAPI(t *testing.T) {
|
|||
gri := kinesis.GetRecordsInput{
|
||||
ShardIterator: aws.String("shard-iterator-01"),
|
||||
}
|
||||
out, _, err := psc.callGetRecordsAPI(&gri)
|
||||
out, err := psc.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, &ret, out)
|
||||
m1.AssertExpectations(t)
|
||||
|
||||
// check that localTPSExceededError is thrown when trying more than 5 TPS
|
||||
m2 := MockKinesisSubscriberGetter{}
|
||||
psc2 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m2},
|
||||
callsLeft: 0,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 500 * time.Millisecond
|
||||
}
|
||||
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, out2)
|
||||
assert.ErrorIs(t, err2, localTPSExceededError)
|
||||
m2.AssertExpectations(t)
|
||||
|
||||
// check that getRecords is called normally in bytesRead = 0 case
|
||||
m3 := MockKinesisSubscriberGetter{}
|
||||
ret3 := kinesis.GetRecordsOutput{}
|
||||
m3.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret3, nil)
|
||||
psc3 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m3},
|
||||
callsLeft: 2,
|
||||
bytesRead: 0,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 2 * time.Second
|
||||
}
|
||||
out3, checkSleepVal, err3 := psc3.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, err3)
|
||||
assert.Equal(t, checkSleepVal, 0)
|
||||
assert.Equal(t, &ret3, out3)
|
||||
m3.AssertExpectations(t)
|
||||
|
||||
// check that correct cool off period is taken for 10mb in 1 second
|
||||
testTime := time.Now()
|
||||
m4 := MockKinesisSubscriberGetter{}
|
||||
ret4 := kinesis.GetRecordsOutput{Records: nil}
|
||||
m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil)
|
||||
psc4 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m4},
|
||||
callsLeft: 2,
|
||||
bytesRead: MaxBytes,
|
||||
lastCheckTime: testTime,
|
||||
remBytes: MaxBytes,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 2 * time.Second
|
||||
}
|
||||
rateLimitTimeNow = func() time.Time {
|
||||
return testTime.Add(time.Second)
|
||||
}
|
||||
out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, err4)
|
||||
assert.Equal(t, &ret4, out4)
|
||||
m4.AssertExpectations(t)
|
||||
if checkSleepVal2 != 0 {
|
||||
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2)
|
||||
}
|
||||
|
||||
// check that no cool off period is taken for 6mb in 3 seconds
|
||||
testTime2 := time.Now()
|
||||
m5 := MockKinesisSubscriberGetter{}
|
||||
ret5 := kinesis.GetRecordsOutput{}
|
||||
m5.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret5, nil)
|
||||
psc5 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m5},
|
||||
callsLeft: 2,
|
||||
bytesRead: MaxBytesPerSecond * 3,
|
||||
lastCheckTime: testTime2,
|
||||
remBytes: MaxBytes,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 3 * time.Second
|
||||
}
|
||||
rateLimitTimeNow = func() time.Time {
|
||||
return testTime2.Add(time.Second * 3)
|
||||
}
|
||||
out5, checkSleepVal3, err5 := psc5.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, err5)
|
||||
assert.Equal(t, checkSleepVal3, 0)
|
||||
assert.Equal(t, &ret5, out5)
|
||||
m5.AssertExpectations(t)
|
||||
|
||||
// check for correct cool off period with 8mb in .2 seconds with 6mb remaining
|
||||
testTime3 := time.Now()
|
||||
m6 := MockKinesisSubscriberGetter{}
|
||||
ret6 := kinesis.GetRecordsOutput{Records: nil}
|
||||
m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil)
|
||||
psc6 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m6},
|
||||
callsLeft: 2,
|
||||
bytesRead: MaxBytesPerSecond * 4,
|
||||
lastCheckTime: testTime3,
|
||||
remBytes: MaxBytesPerSecond * 3,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 3 * time.Second
|
||||
}
|
||||
rateLimitTimeNow = func() time.Time {
|
||||
return testTime3.Add(time.Second / 5)
|
||||
}
|
||||
out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri)
|
||||
assert.Nil(t, err6)
|
||||
assert.Equal(t, &ret6, out6)
|
||||
m5.AssertExpectations(t)
|
||||
if checkSleepVal4 != 0 {
|
||||
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4)
|
||||
}
|
||||
|
||||
// case where getRecords throws error
|
||||
m7 := MockKinesisSubscriberGetter{}
|
||||
ret7 := kinesis.GetRecordsOutput{Records: nil}
|
||||
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError)
|
||||
psc7 := PollingShardConsumer{
|
||||
commonShardConsumer: commonShardConsumer{kc: &m7},
|
||||
callsLeft: 2,
|
||||
bytesRead: 0,
|
||||
}
|
||||
rateLimitTimeSince = func(t time.Time) time.Duration {
|
||||
return 2 * time.Second
|
||||
}
|
||||
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
|
||||
assert.Equal(t, err7, testGetRecordsError)
|
||||
assert.Equal(t, checkSleepVal7, 0)
|
||||
assert.Equal(t, out7, &ret7)
|
||||
m7.AssertExpectations(t)
|
||||
|
||||
// restore original func
|
||||
rateLimitTimeNow = time.Now
|
||||
rateLimitTimeSince = time.Since
|
||||
|
||||
}
|
||||
|
||||
type MockKinesisSubscriberGetter struct {
|
||||
|
|
@ -200,156 +63,3 @@ func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, para
|
|||
func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
|
||||
refTime := time.Now()
|
||||
type fields struct {
|
||||
lastCheckTime time.Time
|
||||
remBytes int
|
||||
bytesRead int
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
timeNow time.Time
|
||||
want int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
"zero time max bytes to spend",
|
||||
fields{
|
||||
time.Time{},
|
||||
0,
|
||||
0,
|
||||
},
|
||||
refTime,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"same second, bytes still left to spend",
|
||||
fields{
|
||||
refTime,
|
||||
MaxBytesPerSecond,
|
||||
MaxBytesPerSecond - 1,
|
||||
},
|
||||
refTime,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"same second, not many but some bytes still left to spend",
|
||||
fields{
|
||||
refTime,
|
||||
8,
|
||||
MaxBytesPerSecond,
|
||||
},
|
||||
refTime,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"same second, 1 byte still left to spend",
|
||||
fields{
|
||||
refTime,
|
||||
1,
|
||||
MaxBytesPerSecond,
|
||||
},
|
||||
refTime,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"next second, bytes still left to spend",
|
||||
fields{
|
||||
refTime,
|
||||
42,
|
||||
1024,
|
||||
},
|
||||
refTime.Add(1 * time.Second),
|
||||
0,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"same second, max bytes per second already spent",
|
||||
fields{
|
||||
refTime,
|
||||
0,
|
||||
MaxBytesPerSecond,
|
||||
},
|
||||
refTime,
|
||||
1,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"same second, more than max bytes per second already spent",
|
||||
fields{
|
||||
refTime,
|
||||
0,
|
||||
MaxBytesPerSecond + 1,
|
||||
},
|
||||
refTime,
|
||||
2,
|
||||
true,
|
||||
},
|
||||
|
||||
// Kinesis prevents reading more than 10 MiB at once
|
||||
{
|
||||
"same second, 10 MiB read all at once",
|
||||
fields{
|
||||
refTime,
|
||||
0,
|
||||
10 * 1024 * 1024,
|
||||
},
|
||||
refTime,
|
||||
6,
|
||||
true,
|
||||
},
|
||||
|
||||
{
|
||||
"same second, 10 MB read all at once",
|
||||
fields{
|
||||
refTime,
|
||||
0,
|
||||
10 * 1000 * 1000,
|
||||
},
|
||||
refTime,
|
||||
5,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"5 seconds ago, 10 MB read all at once",
|
||||
fields{
|
||||
refTime,
|
||||
0,
|
||||
10 * 1000 * 1000,
|
||||
},
|
||||
refTime.Add(5 * time.Second),
|
||||
0,
|
||||
false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
sc := &PollingShardConsumer{
|
||||
lastCheckTime: tt.fields.lastCheckTime,
|
||||
remBytes: tt.fields.remBytes,
|
||||
bytesRead: tt.fields.bytesRead,
|
||||
}
|
||||
rateLimitTimeNow = func() time.Time {
|
||||
return tt.timeNow
|
||||
}
|
||||
got, err := sc.checkCoolOffPeriod()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// restore original time.Now
|
||||
rateLimitTimeNow = time.Now
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ package worker
|
|||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
|
||||
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
||||
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
|
||||
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
|
||||
|
|
|
|||
|
|
@ -160,15 +160,11 @@ func (w *Worker) initialize() error {
|
|||
log.Infof("Creating Kinesis client")
|
||||
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: w.kclConfig.KinesisEndpoint,
|
||||
SigningRegion: w.regionName,
|
||||
}, nil
|
||||
}
|
||||
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: w.kclConfig.KinesisEndpoint,
|
||||
SigningRegion: w.regionName,
|
||||
}, nil
|
||||
})
|
||||
|
||||
cfg, err := awsConfig.LoadDefaultConfig(
|
||||
|
|
@ -276,14 +272,6 @@ func (w *Worker) eventLoop() {
|
|||
rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis)))
|
||||
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64())
|
||||
|
||||
select {
|
||||
case <-*w.stop:
|
||||
log.Infof("Shutting down...")
|
||||
return
|
||||
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
|
||||
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
|
||||
}
|
||||
|
||||
err := w.syncShard()
|
||||
if err != nil {
|
||||
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
|
||||
|
|
@ -375,6 +363,14 @@ func (w *Worker) eventLoop() {
|
|||
log.Warnf("Error in rebalance: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-*w.stop:
|
||||
log.Infof("Shutting down...")
|
||||
return
|
||||
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
|
||||
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
41
go.mod
41
go.mod
|
|
@ -1,18 +1,18 @@
|
|||
module github.com/vmware/vmware-go-kcl-v2
|
||||
|
||||
go 1.21
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go-v2 v1.25.2
|
||||
github.com/aws/aws-sdk-go-v2/config v1.27.5
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.5
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/prometheus/client_golang v1.11.1
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/common v0.32.1
|
||||
github.com/rs/zerolog v1.26.1
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
|
|
@ -23,18 +23,17 @@ require (
|
|||
|
||||
require (
|
||||
github.com/BurntSushi/toml v0.4.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
|
||||
github.com/aws/smithy-go v1.20.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect
|
||||
github.com/aws/smithy-go v1.9.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
|
|
@ -46,7 +45,7 @@ require (
|
|||
github.com/stretchr/objx v0.5.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.7.0 // indirect
|
||||
golang.org/x/sys v0.1.0 // indirect
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
|
|||
81
go.sum
81
go.sum
|
|
@ -41,44 +41,42 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
|
|||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
|
||||
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
|
||||
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.27.5 h1:brBPsyRFQn97M1ZhQ9tLXkO7Zytiar0NS06FGmEJBdg=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.27.5/go.mod h1:I53uvsfddRRTG5YcC4n5Z3aOD1BU8hYCoIG7iEJG4wM=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.5 h1:yn3zSvIKC2NZIs40cY3kckcy9Zma96PrRR07N54PCvY=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.5/go.mod h1:8JcKPAGZVnDWuR5lusAwmrSDtZnDIAnpQWaDC9RFt2g=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 h1:mQySuI87thHtcbZvEDjwUROGWikU6fqgpHklCBXpJU4=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1/go.mod h1:Z1ThUUTuCO9PArtiQsTmBGBv+38NGj+795Zl0n1jgiM=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 h1:n+nT52A+Ik+ut1D8IV4EP1qfyUdP9Jq60uYfnlJwSWc=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2/go.mod h1:BzzW6QegtSMnC1BhD+lagiUDSRYjRTOhXAb1mLfEaMg=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 h1:/MpYoYvgshlGMFmSyfzGWf6HKoEo/DrKBoHxXR3vh+U=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 h1:x0N5ftQzgcfRpCpTiyZC40pvNUJYhzf4UgCsAyO6/P8=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
|
||||
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg=
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8=
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw=
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 h1:0YjXuWdYHvsm0HnT4vO8XpwG1D+i2roxSCBoN6deJ7M=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2/go.mod h1:jI+FWmYkSMn+4APWmZiZTgt0oM0TrvymD51FMqCnWgA=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
|
||||
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
|
||||
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
|
||||
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
|
||||
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
|
|
@ -155,9 +153,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
|
|
@ -214,9 +211,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
|||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
|
|
@ -400,8 +396,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
|
@ -457,6 +453,7 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
|
|||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
|
||||
|
|
|
|||
Loading…
Reference in a new issue