Compare commits

..

No commits in common. "main" and "jc/testable-getRecords" have entirely different histories.

12 changed files with 111 additions and 604 deletions

View file

@ -79,9 +79,6 @@ type Checkpointer interface {
// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
RemoveLeaseOwner(string) error
// GetLeaseOwner to get current owner of lease for shard
GetLeaseOwner(string) (string, error)
// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

View file

@ -51,10 +51,6 @@ const (
NumMaxRetries = 10
)
var (
NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table")
)
// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct {
log logger.Logger
@ -133,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error {
// GetLease attempts to gain a lock on the given shard
func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error {
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano)
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
currentCheckpoint, err := checkpointer.getItem(shard.ID)
if err != nil {
return err
@ -165,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
assignedTo := assignedVar.(*types.AttributeValueMemberS).Value
leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout)
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout)
if err != nil {
return err
}
@ -250,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
// CheckpointSequence writes a checkpoint at the designated sequence ID
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano)
leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339)
marshalledCheckpoint := map[string]types.AttributeValue{
LeaseKeyKey: &types.AttributeValueMemberS{
Value: shard.ID,
@ -294,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
// Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" {
currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value)
currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value)
if err != nil {
return err
}
@ -340,23 +336,6 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
return err
}
// GetLeaseOwner returns current lease owner of given shard in checkpoints table
func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) {
currentCheckpoint, err := checkpointer.getItem(shardID)
if err != nil {
return "", err
}
assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey]
if !assignedToOk {
return "", NoLeaseOwnerErr
}
return assignedVar.(*types.AttributeValueMemberS).Value, nil
}
// ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
err := checkpointer.syncLeases(shardStatus)
@ -391,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID
if err != nil && err != ErrSequenceIDNotFound {
return err
}
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano)
leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339)
conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
expressionAttributeValues := map[string]types.AttributeValue{
@ -483,6 +462,10 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha
}
}
if err != nil {
log.Debugf("Error performing SyncLeases. Error: %+v ", err)
return err
}
log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond)
return nil
}

View file

@ -69,9 +69,6 @@ const (
// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
DefaultLeaseRefreshPeriodMillis = 5000
// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
DefaultLeaseRefreshWaitTime = 2500
// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
DefaultMaxRecords = 10000
@ -139,9 +136,6 @@ const (
// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
DefaultLeaseSyncingIntervalMillis = 60000
// DefaultMaxRetryCount The default maximum number of retries in case of error
DefaultMaxRetryCount = 5
)
type (
@ -219,9 +213,6 @@ type (
// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
LeaseRefreshPeriodMillis int
// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
LeaseRefreshWaitTime int
// MaxRecords Max records to read per Kinesis getRecords() call
MaxRecords int
@ -292,9 +283,6 @@ type (
// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
LeaseSyncingTimeIntervalMillis int
// MaxRetryCount The maximum number of retries in case of error
MaxRetryCount int
}
)

View file

@ -102,8 +102,6 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis,
LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis,
LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis,
LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime,
MaxRetryCount: DefaultMaxRetryCount,
Logger: logger.GetDefaultLogger(),
}
}
@ -150,12 +148,6 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres
return c
}
func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration {
checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime)
c.LeaseRefreshWaitTime = leaseRefreshWaitTime
return c
}
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration {
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis)
c.ShardSyncIntervalMillis = shardSyncIntervalMillis
@ -219,13 +211,6 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi
return c
}
// WithMaxRetryCount sets the max retry count in case of error.
func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration {
checkIsValuePositive("maxRetryCount", maxRetryCount)
c.MaxRetryCount = maxRetryCount
return c
}
// WithMonitoringService sets the monitoring service to use to publish metrics.
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration {
// Nil case is handled downward (at worker creation) so no need to do it here.

View file

@ -332,10 +332,6 @@ func (cw *MonitoringService) RecordProcessRecordsTime(shard string, time float64
m.processRecordsTime = append(m.processRecordsTime, time)
}
func (cw *MonitoringService) DeleteMetricMillisBehindLatest(shard string) {
// not implemented
}
func (cw *MonitoringService) getOrCreatePerShardMetrics(shard string) *cloudWatchMetrics {
var i interface{}
var ok bool

View file

@ -66,7 +66,7 @@ func (sc *commonShardConsumer) releaseLease(shard string) {
// Release the lease by wiping out the lease owner for the shard
// Note: we don't need to do anything in case of error here and shard lease will eventually be expired.
if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil {
log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err)
}
// reporting lease lose metrics
@ -173,6 +173,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input)
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
}

View file

@ -32,7 +32,6 @@ package worker
import (
"context"
"errors"
log "github.com/sirupsen/logrus"
"math"
"time"
@ -45,33 +44,14 @@ import (
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
)
const (
kinesisReadTPSLimit = 5
MaxBytes = 10000000
MaxBytesPerSecond = 2000000
BytesToMbConversion = 1000000
)
var (
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
localTPSExceededError = errors.New("Error GetRecords TPS Exceeded")
maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded")
)
// PollingShardConsumer is responsible for polling data records from a (specified) shard.
// Note: PollingShardConsumer only deal with one shard.
type PollingShardConsumer struct {
commonShardConsumer
streamName string
stop *chan struct{}
consumerID string
mService metrics.MonitoringService
currTime time.Time
callsLeft int
remBytes int
lastCheckTime time.Time
bytesRead int
streamName string
stop *chan struct{}
consumerID string
mService metrics.MonitoringService
}
func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
@ -99,12 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) {
// getRecords continuously poll one shard for data record
// Precondition: it currently has the lease on the shard.
func (sc *PollingShardConsumer) getRecords() error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer func() {
// cancel renewLease()
cancelFunc()
sc.releaseLease(sc.shard.ID)
}()
defer sc.releaseLease(sc.shard.ID)
log := sc.kclConfig.Logger
@ -133,18 +108,24 @@ func (sc *PollingShardConsumer) getRecords() error {
recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer)
retriedErrors := 0
// define API call rate limit starting window
sc.currTime = rateLimitTimeNow()
sc.callsLeft = kinesisReadTPSLimit
sc.bytesRead = 0
sc.remBytes = MaxBytes
// starting async lease renewal thread
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
for {
if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) {
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err = sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
if errors.As(err, &chk.ErrLeaseNotAcquired{}) {
log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
return nil
}
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
}
getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
@ -154,49 +135,14 @@ func (sc *PollingShardConsumer) getRecords() error {
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
ShardIterator: shardIterator,
}
getResp, coolDownPeriod, err := sc.callGetRecordsAPI(getRecordsArgs)
getResp, err := sc.callGetRecordsAPI(getRecordsArgs)
if err != nil {
//aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling
var throughputExceededErr *types.ProvisionedThroughputExceededException
var kmsThrottlingErr *types.KMSThrottlingException
if errors.As(err, &throughputExceededErr) {
retriedErrors++
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "Throughput Exceeded Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// If there is insufficient provisioned throughput on the stream,
// subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
sc.waitASecond(sc.currTime)
continue
}
if err == localTPSExceededError {
log.Infof("localTPSExceededError so sleep for a second")
sc.waitASecond(sc.currTime)
continue
}
if err == maxBytesExceededError {
log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod)
time.Sleep(time.Duration(coolDownPeriod) * time.Second)
continue
}
if errors.As(err, &kmsThrottlingErr) {
if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) {
log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err)
retriedErrors++
// Greater than MaxRetryCount so we get the last retry
if retriedErrors > sc.kclConfig.MaxRetryCount {
log.Errorf("message", "KMS Throttling Error: "+
"reached max retry count getting records from shard",
"shardId", sc.shard.ID,
"retryCount", retriedErrors,
"error", err)
return err
}
// exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
@ -231,104 +177,12 @@ func (sc *PollingShardConsumer) getRecords() error {
shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer}
sc.recordProcessor.Shutdown(shutdownInput)
return nil
case leaseRenewalErr := <-leaseRenewalErrChan:
return leaseRenewalErr
default:
}
}
}
func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) {
waitTime := time.Since(timePassed)
if waitTime < time.Second {
time.Sleep(time.Second - waitTime)
}
}
func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) {
// Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords.
// If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception.
// ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
// check for overspending of byte budget from getRecords call
currentTime := rateLimitTimeNow()
secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds()
sc.lastCheckTime = currentTime
sc.remBytes += int(secondsPassed * MaxBytesPerSecond)
if sc.remBytes > MaxBytes {
sc.remBytes = MaxBytes
}
if sc.remBytes < 1 {
// Wait until cool down period has passed to prevent ProvisionedThroughputExceededException
coolDown := sc.bytesRead / MaxBytesPerSecond
if sc.bytesRead%MaxBytesPerSecond > 0 {
coolDown++
}
return coolDown, maxBytesExceededError
} else {
sc.remBytes -= sc.bytesRead
}
return 0, nil
}
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, int, error) {
if sc.bytesRead != 0 {
coolDownPeriod, err := sc.checkCoolOffPeriod()
if err != nil {
return nil, coolDownPeriod, err
}
}
// every new second, we get a fresh set of calls
if rateLimitTimeSince(sc.currTime) > time.Second {
sc.callsLeft = kinesisReadTPSLimit
sc.currTime = rateLimitTimeNow()
}
if sc.callsLeft < 1 {
return nil, 0, localTPSExceededError
}
func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) {
getResp, err := sc.kc.GetRecords(context.TODO(), gri)
sc.callsLeft--
if err != nil {
return getResp, 0, err
}
// Calculate size of records from read transaction
sc.bytesRead = 0
for _, record := range getResp.Records {
sc.bytesRead += len(record.Data)
}
if sc.lastCheckTime.IsZero() {
sc.lastCheckTime = rateLimitTimeNow()
}
return getResp, 0, err
}
func (sc *PollingShardConsumer) renewLease(ctx context.Context) error {
renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond
for {
timer := time.NewTimer(renewDuration)
select {
case <-timer.C:
log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID)
err := sc.checkpointer.GetLease(sc.shard, sc.consumerID)
if err != nil {
// log and return error
log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v",
sc.shard.ID, sc.consumerID, err)
return err
}
// log metric for renewed lease for worker
sc.mService.LeaseRenewed(sc.shard.ID)
case <-ctx.Done():
// clean up timer resources
if !timer.Stop() {
<-timer.C
}
log.Debugf("renewLease was canceled")
return nil
}
}
return getResp, err
}

View file

@ -21,9 +21,7 @@ package worker
import (
"context"
"errors"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -31,10 +29,6 @@ import (
"github.com/stretchr/testify/mock"
)
var (
testGetRecordsError = errors.New("GetRecords Error")
)
func TestCallGetRecordsAPI(t *testing.T) {
// basic happy path
m1 := MockKinesisSubscriberGetter{}
@ -46,141 +40,10 @@ func TestCallGetRecordsAPI(t *testing.T) {
gri := kinesis.GetRecordsInput{
ShardIterator: aws.String("shard-iterator-01"),
}
out, _, err := psc.callGetRecordsAPI(&gri)
out, err := psc.callGetRecordsAPI(&gri)
assert.Nil(t, err)
assert.Equal(t, &ret, out)
m1.AssertExpectations(t)
// check that localTPSExceededError is thrown when trying more than 5 TPS
m2 := MockKinesisSubscriberGetter{}
psc2 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m2},
callsLeft: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 500 * time.Millisecond
}
out2, _, err2 := psc2.callGetRecordsAPI(&gri)
assert.Nil(t, out2)
assert.ErrorIs(t, err2, localTPSExceededError)
m2.AssertExpectations(t)
// check that getRecords is called normally in bytesRead = 0 case
m3 := MockKinesisSubscriberGetter{}
ret3 := kinesis.GetRecordsOutput{}
m3.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret3, nil)
psc3 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m3},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out3, checkSleepVal, err3 := psc3.callGetRecordsAPI(&gri)
assert.Nil(t, err3)
assert.Equal(t, checkSleepVal, 0)
assert.Equal(t, &ret3, out3)
m3.AssertExpectations(t)
// check that correct cool off period is taken for 10mb in 1 second
testTime := time.Now()
m4 := MockKinesisSubscriberGetter{}
ret4 := kinesis.GetRecordsOutput{Records: nil}
m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil)
psc4 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m4},
callsLeft: 2,
bytesRead: MaxBytes,
lastCheckTime: testTime,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime.Add(time.Second)
}
out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri)
assert.Nil(t, err4)
assert.Equal(t, &ret4, out4)
m4.AssertExpectations(t)
if checkSleepVal2 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2)
}
// check that no cool off period is taken for 6mb in 3 seconds
testTime2 := time.Now()
m5 := MockKinesisSubscriberGetter{}
ret5 := kinesis.GetRecordsOutput{}
m5.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret5, nil)
psc5 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m5},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 3,
lastCheckTime: testTime2,
remBytes: MaxBytes,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime2.Add(time.Second * 3)
}
out5, checkSleepVal3, err5 := psc5.callGetRecordsAPI(&gri)
assert.Nil(t, err5)
assert.Equal(t, checkSleepVal3, 0)
assert.Equal(t, &ret5, out5)
m5.AssertExpectations(t)
// check for correct cool off period with 8mb in .2 seconds with 6mb remaining
testTime3 := time.Now()
m6 := MockKinesisSubscriberGetter{}
ret6 := kinesis.GetRecordsOutput{Records: nil}
m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil)
psc6 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m6},
callsLeft: 2,
bytesRead: MaxBytesPerSecond * 4,
lastCheckTime: testTime3,
remBytes: MaxBytesPerSecond * 3,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 3 * time.Second
}
rateLimitTimeNow = func() time.Time {
return testTime3.Add(time.Second / 5)
}
out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri)
assert.Nil(t, err6)
assert.Equal(t, &ret6, out6)
m5.AssertExpectations(t)
if checkSleepVal4 != 0 {
t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4)
}
// case where getRecords throws error
m7 := MockKinesisSubscriberGetter{}
ret7 := kinesis.GetRecordsOutput{Records: nil}
m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError)
psc7 := PollingShardConsumer{
commonShardConsumer: commonShardConsumer{kc: &m7},
callsLeft: 2,
bytesRead: 0,
}
rateLimitTimeSince = func(t time.Time) time.Duration {
return 2 * time.Second
}
out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri)
assert.Equal(t, err7, testGetRecordsError)
assert.Equal(t, checkSleepVal7, 0)
assert.Equal(t, out7, &ret7)
m7.AssertExpectations(t)
// restore original func
rateLimitTimeNow = time.Now
rateLimitTimeSince = time.Since
}
type MockKinesisSubscriberGetter struct {
@ -200,156 +63,3 @@ func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, para
func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) {
return nil, nil
}
func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
refTime := time.Now()
type fields struct {
lastCheckTime time.Time
remBytes int
bytesRead int
}
tests := []struct {
name string
fields fields
timeNow time.Time
want int
wantErr bool
}{
{
"zero time max bytes to spend",
fields{
time.Time{},
0,
0,
},
refTime,
0,
false,
},
{
"same second, bytes still left to spend",
fields{
refTime,
MaxBytesPerSecond,
MaxBytesPerSecond - 1,
},
refTime,
0,
false,
},
{
"same second, not many but some bytes still left to spend",
fields{
refTime,
8,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"same second, 1 byte still left to spend",
fields{
refTime,
1,
MaxBytesPerSecond,
},
refTime,
0,
false,
},
{
"next second, bytes still left to spend",
fields{
refTime,
42,
1024,
},
refTime.Add(1 * time.Second),
0,
false,
},
{
"same second, max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond,
},
refTime,
1,
true,
},
{
"same second, more than max bytes per second already spent",
fields{
refTime,
0,
MaxBytesPerSecond + 1,
},
refTime,
2,
true,
},
// Kinesis prevents reading more than 10 MiB at once
{
"same second, 10 MiB read all at once",
fields{
refTime,
0,
10 * 1024 * 1024,
},
refTime,
6,
true,
},
{
"same second, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime,
5,
true,
},
{
"5 seconds ago, 10 MB read all at once",
fields{
refTime,
0,
10 * 1000 * 1000,
},
refTime.Add(5 * time.Second),
0,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &PollingShardConsumer{
lastCheckTime: tt.fields.lastCheckTime,
remBytes: tt.fields.remBytes,
bytesRead: tt.fields.bytesRead,
}
rateLimitTimeNow = func() time.Time {
return tt.timeNow
}
got, err := sc.checkCoolOffPeriod()
if (err != nil) != tt.wantErr {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want)
}
})
}
// restore original time.Now
rateLimitTimeNow = time.Now
}

View file

@ -22,6 +22,7 @@ package worker
import (
"github.com/aws/aws-sdk-go-v2/aws"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"

View file

@ -160,15 +160,11 @@ func (w *Worker) initialize() error {
log.Infof("Creating Kinesis client")
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 {
return aws.Endpoint{
PartitionID: "aws",
URL: w.kclConfig.KinesisEndpoint,
SigningRegion: w.regionName,
}, nil
}
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
return aws.Endpoint{
PartitionID: "aws",
URL: w.kclConfig.KinesisEndpoint,
SigningRegion: w.regionName,
}, nil
})
cfg, err := awsConfig.LoadDefaultConfig(
@ -276,14 +272,6 @@ func (w *Worker) eventLoop() {
rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis)))
shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64())
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
err := w.syncShard()
if err != nil {
log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep)
@ -375,6 +363,14 @@ func (w *Worker) eventLoop() {
log.Warnf("Error in rebalance: %+v", err)
}
}
select {
case <-*w.stop:
log.Infof("Shutting down...")
return
case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond):
log.Debugf("Waited %d ms to sync shards...", shardSyncSleep)
}
}
}

41
go.mod
View file

@ -1,18 +1,18 @@
module github.com/vmware/vmware-go-kcl-v2
go 1.21
go 1.17
require (
github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.5
github.com/aws/aws-sdk-go-v2/credentials v1.17.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1
github.com/aws/aws-sdk-go-v2 v1.11.2
github.com/aws/aws-sdk-go-v2/config v1.11.1
github.com/aws/aws-sdk-go-v2/credentials v1.6.5
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1
github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.8.1
@ -23,18 +23,17 @@ require (
require (
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 // indirect
github.com/aws/smithy-go v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect
github.com/aws/smithy-go v1.9.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@ -46,7 +45,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

81
go.sum
View file

@ -41,44 +41,42 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1/go.mod h1:sxpLb+nZk7tIfCWChfd+h4QwHNUR57d8hA1cleTkjJo=
github.com/aws/aws-sdk-go-v2/config v1.27.5 h1:brBPsyRFQn97M1ZhQ9tLXkO7Zytiar0NS06FGmEJBdg=
github.com/aws/aws-sdk-go-v2/config v1.27.5/go.mod h1:I53uvsfddRRTG5YcC4n5Z3aOD1BU8hYCoIG7iEJG4wM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.5 h1:yn3zSvIKC2NZIs40cY3kckcy9Zma96PrRR07N54PCvY=
github.com/aws/aws-sdk-go-v2/credentials v1.17.5/go.mod h1:8JcKPAGZVnDWuR5lusAwmrSDtZnDIAnpQWaDC9RFt2g=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2 h1:AK0J8iYBFeUk2Ax7O8YpLtFsfhdOByh2QIkHmigpRYk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.2/go.mod h1:iRlGzMix0SExQEviAyptRWRGdYNo3+ufW/lCzvKVTUc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1 h1:mQySuI87thHtcbZvEDjwUROGWikU6fqgpHklCBXpJU4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.36.1/go.mod h1:Z1ThUUTuCO9PArtiQsTmBGBv+38NGj+795Zl0n1jgiM=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2 h1:n+nT52A+Ik+ut1D8IV4EP1qfyUdP9Jq60uYfnlJwSWc=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.30.2/go.mod h1:BzzW6QegtSMnC1BhD+lagiUDSRYjRTOhXAb1mLfEaMg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 h1:EyBZibRTVAs6ECHZOw5/wlylS9OcTzwyjeQMudmREjE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1/go.mod h1:JKpmtYhhPs7D97NL/ltqz7yCkERFW5dOlHyVl66ZYF8=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3 h1:/MpYoYvgshlGMFmSyfzGWf6HKoEo/DrKBoHxXR3vh+U=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.3/go.mod h1:1Pf5vPqk8t9pdYB3dmUMRE/0m8u0IHHg8ESSiutJd0I=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3 h1:x0N5ftQzgcfRpCpTiyZC40pvNUJYhzf4UgCsAyO6/P8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.3/go.mod h1:Ru7vg1iQ7cR4i7SZ/JTLYN9kaXtbL69UdgG0OQWQxW0=
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM=
github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto=
github.com/aws/aws-sdk-go-v2/config v1.11.1/go.mod h1:VvfkzUhVtntSg1JfGFMSKS0CyiTZd3NqBxK5af4zsME=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1kwkizGdZHY+ni3s=
github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1 h1:utEGkfdQ4L6YW/ietH7111ZYglLJvS+sLriHJ1NBJEQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.1/go.mod h1:RsYqzYr2F2oPDdpy+PdhephuZxTfjHQe7SOBcZGoAU8=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1 h1:9/GylMS45hGGFCcMrUZDVayQE1jYSIN6da9jo7RAYIw=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.1/go.mod h1:YjAPFn4kGFqKC54VsHs5fn5B6d+PCY2tziEa3U/GB5Y=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2 h1:0YjXuWdYHvsm0HnT4vO8XpwG1D+i2roxSCBoN6deJ7M=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.2/go.mod h1:jI+FWmYkSMn+4APWmZiZTgt0oM0TrvymD51FMqCnWgA=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@ -155,9 +153,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -214,9 +211,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -400,8 +396,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -457,6 +453,7 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=