Compare commits

..

1 commit

Author SHA1 Message Date
Harlow Ward
51628b09fa Leverage upstream Ksis deagregator
The v2 deagregator has been merged so we can use it directly
https://github.com/awslabs/kinesis-aggregation/tree/master/go/v2/deaggregator

Fixes #145
2022-07-06 08:43:13 -07:00
13 changed files with 158 additions and 863 deletions

View file

@ -119,7 +119,7 @@ The uniq identifier for a consumer is `[appName, streamName, shardID]`
Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.
The consumer accepts a `WithStore` option to set the storage layer:
The consumer accpets a `WithStore` option to set the storage layer:
```go
c, err := consumer.New(*stream, consumer.WithStore(db))
@ -132,7 +132,7 @@ To persist scan progress choose one of the following storage layers:
#### Redis
The Redis checkpoint requires App Name, and Stream Name:
The Redis checkpoint requries App Name, and Stream Name:
```go
import store "github.com/harlow/kinesis-consumer/store/redis"
@ -292,7 +292,7 @@ c, err := consumer.New(
### Logging
Logging supports the basic built-in logging library or use third party external one, so long as
Logging supports the basic built-in logging library or use thrid party external one, so long as
it implements the Logger interface.
For example, to use the builtin logging package, we wrap it with myLogger structure.

View file

@ -2,20 +2,18 @@ package consumer
import (
"context"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
// NewAllGroup returns an initialized AllGroup for consuming
// NewAllGroup returns an intitialized AllGroup for consuming
// all shards on a stream
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
return &AllGroup{
ksis: ksis,
shards: make(map[string]types.Shard),
shardsClosed: make(map[string]chan struct{}),
streamName: streamName,
logger: logger,
Store: store,
@ -33,17 +31,16 @@ type AllGroup struct {
shardMu sync.Mutex
shards map[string]types.Shard
shardsClosed map[string]chan struct{}
}
// Start is a blocking operation which will loop and attempt to find new
// shards on a regular cadence.
func (g *AllGroup) Start(ctx context.Context, shardC chan types.Shard) error {
func (g *AllGroup) Start(ctx context.Context, shardc chan types.Shard) {
// Note: while ticker is a rather naive approach to this problem,
// it actually simplifies a few things. I.e. If we miss a new shard
// while AWS is resharding, we'll pick it up max 30 seconds later.
// it actually simplifies a few things. i.e. If we miss a new shard
// while AWS is resharding we'll pick it up max 30 seconds later.
// It might be worth refactoring this flow to allow the consumer
// It might be worth refactoring this flow to allow the consumer to
// to notify the broker when a shard is closed. However, shards don't
// necessarily close at the same time, so we could potentially get a
// thundering heard of notifications from the consumer.
@ -51,50 +48,21 @@ func (g *AllGroup) Start(ctx context.Context, shardC chan types.Shard) error {
var ticker = time.NewTicker(30 * time.Second)
for {
if err := g.findNewShards(ctx, shardC); err != nil {
ticker.Stop()
return err
}
g.findNewShards(ctx, shardc)
select {
case <-ctx.Done():
ticker.Stop()
return nil
return
case <-ticker.C:
}
}
}
func (g *AllGroup) CloseShard(_ context.Context, shardID string) error {
g.shardMu.Lock()
defer g.shardMu.Unlock()
c, ok := g.shardsClosed[shardID]
if !ok {
return fmt.Errorf("closing unknown shard ID %q", shardID)
}
close(c)
return nil
}
func waitForCloseChannel(ctx context.Context, c <-chan struct{}) bool {
if c == nil {
// no channel means we haven't seen this shard in listShards, so it
// probably fell off the TRIM_HORIZON, and we can assume it's fully processed.
return true
}
select {
case <-ctx.Done():
return false
case <-c:
// the channel has been processed and closed by the consumer (CloseShard has been called)
return true
}
}
// findNewShards pulls the list of shards from the Kinesis API
// and uses a local cache to determine if we are already processing
// a particular shard.
func (g *AllGroup) findNewShards(ctx context.Context, shardC chan types.Shard) error {
func (g *AllGroup) findNewShards(ctx context.Context, shardc chan types.Shard) {
g.shardMu.Lock()
defer g.shardMu.Unlock()
@ -103,42 +71,14 @@ func (g *AllGroup) findNewShards(ctx context.Context, shardC chan types.Shard) e
shards, err := listShards(ctx, g.ksis, g.streamName)
if err != nil {
g.logger.Log("[GROUP] error:", err)
return err
return
}
// We do two `for` loops, since we have to set up all the `shardClosed`
// channels before we start using any of them. It's highly probable
// that Kinesis provides us the shards in dependency order (parents
// before children), but it doesn't appear to be a guarantee.
newShards := make(map[string]types.Shard)
for _, shard := range shards {
if _, ok := g.shards[*shard.ShardId]; ok {
continue
}
g.shards[*shard.ShardId] = shard
g.shardsClosed[*shard.ShardId] = make(chan struct{})
newShards[*shard.ShardId] = shard
shardc <- shard
}
// only new shards need to be checked for parent dependencies
for _, shard := range newShards {
shard := shard // Shadow shard, since we use it in goroutine
var parent1, parent2 <-chan struct{}
if shard.ParentShardId != nil {
parent1 = g.shardsClosed[*shard.ParentShardId]
}
if shard.AdjacentParentShardId != nil {
parent2 = g.shardsClosed[*shard.AdjacentParentShardId]
}
go func() {
// Asynchronously wait for all parents of this shard to be processed
// before providing it out to our client. Kinesis guarantees that a
// given partition key's data will be provided to clients in-order,
// but when splits or joins happen, we need to process all parents prior
// to processing children or that ordering guarantee is not maintained.
if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
shardC <- shard
}
}()
}
return nil
}

View file

@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"sync"
"time"
@ -13,7 +13,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/harlow/kinesis-consumer/internal/deaggregator"
"github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
)
// Record wraps the record returned from the Kinesis library and
@ -38,7 +38,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
store: &noopStore{},
counter: &noopCounter{},
logger: &noopLogger{
logger: log.New(io.Discard, "", log.LstdFlags),
logger: log.New(ioutil.Discard, "", log.LstdFlags),
},
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
@ -90,7 +90,7 @@ type Consumer struct {
type ScanFunc func(*Record) error
// ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that
// the current checkpoint should be skipped. It is not returned
// the current checkpoint should be skipped skipped. It is not returned
// as an error by any function.
var ErrSkipCheckpoint = errors.New("skip checkpoint")
@ -102,60 +102,40 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
defer cancel()
var (
errC = make(chan error, 1)
shardC = make(chan types.Shard, 1)
errc = make(chan error, 1)
shardc = make(chan types.Shard, 1)
)
go func() {
err := c.group.Start(ctx, shardC)
if err != nil {
errC <- fmt.Errorf("error starting scan: %w", err)
cancel()
}
c.group.Start(ctx, shardc)
<-ctx.Done()
close(shardC)
close(shardc)
}()
wg := new(sync.WaitGroup)
// process each of the shards
s := newShardsInProcess()
for shard := range shardC {
shardId := aws.ToString(shard.ShardId)
if s.doesShardExist(shardId) {
// safetynet: if shard already in process by another goroutine, just skipping the request
continue
}
for shard := range shardc {
wg.Add(1)
go func(shardID string) {
s.addShard(shardID)
defer func() {
s.deleteShard(shardID)
}()
defer wg.Done()
var err error
if err = c.ScanShard(ctx, shardID, fn); err != nil {
err = fmt.Errorf("shard %s error: %w", shardID, err)
} else if closeable, ok := c.group.(CloseableGroup); !ok {
// group doesn't allow closure, skip calling CloseShard
} else if err = closeable.CloseShard(ctx, shardID); err != nil {
err = fmt.Errorf("shard closed CloseableGroup error: %w", err)
}
if err != nil {
if err := c.ScanShard(ctx, shardID, fn); err != nil {
select {
case errC <- fmt.Errorf("shard %s error: %w", shardID, err):
case errc <- fmt.Errorf("shard %s error: %w", shardID, err):
// first error to occur
cancel()
default:
// error has already occurred
}
}
}(shardId)
}(aws.ToString(shard.ShardId))
}
go func() {
wg.Wait()
close(errC)
close(errc)
}()
return <-errC
return <-errc
}
// ScanShard loops over records on a specific shard, calls the callback func
@ -219,13 +199,15 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
return nil
default:
err := fn(&Record{r, shardID, resp.MillisBehindLatest})
if err != nil && !errors.Is(err, ErrSkipCheckpoint) {
if err != nil && err != ErrSkipCheckpoint {
return err
}
if err != ErrSkipCheckpoint {
if err := c.group.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil {
return err
}
}
c.counter.Add("records", 1)
lastSeqNum = *r.SequenceNumber
@ -236,7 +218,9 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
c.logger.Log("[CONSUMER] shard closed:", shardID)
if c.shardClosedHandler != nil {
if err := c.shardClosedHandler(c.streamName, shardID); err != nil {
err := c.shardClosedHandler(c.streamName, shardID)
if err != nil {
return fmt.Errorf("shard closed handler error: %w", err)
}
}
@ -288,21 +272,18 @@ func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, se
params.ShardIteratorType = types.ShardIteratorTypeAtTimestamp
params.Timestamp = c.initialTimestamp
} else {
params.ShardIteratorType = c.initialShardIteratorType
params.ShardIteratorType = types.ShardIteratorType(c.initialShardIteratorType)
}
res, err := c.client.GetShardIterator(ctx, params)
if err != nil {
return nil, err
}
return res.ShardIterator, nil
return res.ShardIterator, err
}
func isRetriableError(err error) bool {
if oe := (*types.ExpiredIteratorException)(nil); errors.As(err, &oe) {
switch err.(type) {
case *types.ExpiredIteratorException:
return true
}
if oe := (*types.ProvisionedThroughputExceededException)(nil); errors.As(err, &oe) {
case *types.ProvisionedThroughputExceededException:
return true
}
return false
@ -311,24 +292,3 @@ func isRetriableError(err error) bool {
func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
return nextShardIterator == nil || currentShardIterator == nextShardIterator
}
type shards struct {
shardsInProcess sync.Map
}
func newShardsInProcess() *shards {
return &shards{}
}
func (s *shards) addShard(shardId string) {
s.shardsInProcess.Store(shardId, struct{}{})
}
func (s *shards) doesShardExist(shardId string) bool {
_, ok := s.shardsInProcess.Load(shardId)
return ok
}
func (s *shards) deleteShard(shardId string) {
s.shardsInProcess.Delete(shardId)
}

View file

@ -2,12 +2,9 @@ package consumer
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
@ -27,15 +24,6 @@ var records = []types.Record{
},
}
// Implement logger to wrap testing.T.Log.
type testLogger struct {
t *testing.T
}
func (t *testLogger) Log(args ...interface{}) {
t.t.Log(args...)
}
func TestNew(t *testing.T) {
if _, err := New("myStreamName"); err != nil {
t.Fatalf("new consumer error: %v", err)
@ -72,7 +60,6 @@ func TestScan(t *testing.T) {
WithClient(client),
WithCounter(ctr),
WithStore(cp),
WithLogger(&testLogger{t}),
)
if err != nil {
t.Fatalf("new consumer error: %v", err)
@ -111,71 +98,6 @@ func TestScan(t *testing.T) {
}
}
func TestScan_ListShardsError(t *testing.T) {
mockError := errors.New("mock list shards error")
client := &kinesisClientMock{
listShardsMock: func(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) {
return nil, mockError
},
}
// use cancel func to signal shutdown
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var res string
var fn = func(r *Record) error {
res += string(r.Data)
cancel() // simulate cancellation while processing first record
return nil
}
c, err := New("myStreamName", WithClient(client))
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
err = c.Scan(ctx, fn)
if !errors.Is(err, mockError) {
t.Errorf("expected an error from listShards, but instead got %v", err)
}
}
func TestScan_GetShardIteratorError(t *testing.T) {
mockError := errors.New("mock get shard iterator error")
client := &kinesisClientMock{
listShardsMock: func(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) {
return &kinesis.ListShardsOutput{
Shards: []types.Shard{
{ShardId: aws.String("myShard")},
},
}, nil
},
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
return nil, mockError
},
}
// use cancel func to signal shutdown
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var res string
var fn = func(r *Record) error {
res += string(r.Data)
cancel() // simulate cancellation while processing first record
return nil
}
c, err := New("myStreamName", WithClient(client))
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
err = c.Scan(ctx, fn)
if !errors.Is(err, mockError) {
t.Errorf("expected an error from getShardIterator, but instead got %v", err)
}
}
func TestScanShard(t *testing.T) {
var client = &kinesisClientMock{
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
@ -200,7 +122,6 @@ func TestScanShard(t *testing.T) {
WithClient(client),
WithCounter(ctr),
WithStore(cp),
WithLogger(&testLogger{t}),
)
if err != nil {
t.Fatalf("new consumer error: %v", err)
@ -380,8 +301,7 @@ func TestScanShard_ShardIsClosed_WithShardClosedHandler(t *testing.T) {
WithClient(client),
WithShardClosedHandler(func(streamName, shardID string) error {
return fmt.Errorf("closed shard error")
}),
WithLogger(&testLogger{t}))
}))
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
@ -415,7 +335,7 @@ func TestScanShard_GetRecordsError(t *testing.T) {
return nil
}
c, err := New("myStreamName", WithClient(client), WithLogger(&testLogger{t}))
c, err := New("myStreamName", WithClient(client))
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
@ -464,201 +384,3 @@ func (fc *fakeCounter) Add(streamName string, count int64) {
fc.counter += count
}
func TestScan_PreviousParentsBeforeTrimHorizon(t *testing.T) {
client := &kinesisClientMock{
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
return &kinesis.GetShardIteratorOutput{
ShardIterator: aws.String("49578481031144599192696750682534686652010819674221576194"),
}, nil
},
getRecordsMock: func(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) {
return &kinesis.GetRecordsOutput{
NextShardIterator: nil,
Records: records,
}, nil
},
listShardsMock: func(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) {
return &kinesis.ListShardsOutput{
Shards: []types.Shard{
{
ShardId: aws.String("myShard"),
ParentShardId: aws.String("myOldParent"),
AdjacentParentShardId: aws.String("myOldAdjacentParent"),
},
},
}, nil
},
}
var (
cp = store.New()
ctr = &fakeCounter{}
)
c, err := New("myStreamName",
WithClient(client),
WithCounter(ctr),
WithStore(cp),
WithLogger(&testLogger{t}),
)
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
var (
ctx, cancel = context.WithCancel(context.Background())
res string
)
var fn = func(r *Record) error {
res += string(r.Data)
if string(r.Data) == "lastData" {
cancel()
}
return nil
}
if err := c.Scan(ctx, fn); err != nil {
t.Errorf("scan returned unexpected error %v", err)
}
if res != "firstDatalastData" {
t.Errorf("callback error expected %s, got %s", "firstDatalastData", res)
}
if val := ctr.Get(); val != 2 {
t.Errorf("counter error expected %d, got %d", 2, val)
}
val, err := cp.GetCheckpoint("myStreamName", "myShard")
if err != nil && val != "lastSeqNum" {
t.Errorf("checkout error expected %s, got %s", "lastSeqNum", val)
}
}
func TestScan_ParentChildOrdering(t *testing.T) {
// We create a set of shards where shard1 split into (shard2,shard3), then (shard2,shard3) merged into shard4.
client := &kinesisClientMock{
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
return &kinesis.GetShardIteratorOutput{
ShardIterator: aws.String(*params.ShardId + "iter"),
}, nil
},
getRecordsMock: func(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) {
switch *params.ShardIterator {
case "shard1iter":
return &kinesis.GetRecordsOutput{
NextShardIterator: nil,
Records: []types.Record{
{
Data: []byte("shard1data"),
SequenceNumber: aws.String("shard1num"),
},
},
}, nil
case "shard2iter":
return &kinesis.GetRecordsOutput{
NextShardIterator: nil,
Records: []types.Record{},
}, nil
case "shard3iter":
return &kinesis.GetRecordsOutput{
NextShardIterator: nil,
Records: []types.Record{
{
Data: []byte("shard3data"),
SequenceNumber: aws.String("shard3num"),
},
},
}, nil
case "shard4iter":
return &kinesis.GetRecordsOutput{
NextShardIterator: nil,
Records: []types.Record{
{
Data: []byte("shard4data"),
SequenceNumber: aws.String("shard4num"),
},
},
}, nil
default:
panic("got unexpected iterator")
}
},
listShardsMock: func(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) {
// Intentionally misorder these to test resiliance to ordering issues from ListShards.
return &kinesis.ListShardsOutput{
Shards: []types.Shard{
{
ShardId: aws.String("shard3"),
ParentShardId: aws.String("shard1"),
},
{
ShardId: aws.String("shard1"),
ParentShardId: aws.String("shard0"), // not otherwise referenced, parent ordering should ignore this
},
{
ShardId: aws.String("shard4"),
ParentShardId: aws.String("shard2"),
AdjacentParentShardId: aws.String("shard3"),
},
{
ShardId: aws.String("shard2"),
ParentShardId: aws.String("shard1"),
},
},
}, nil
},
}
var (
cp = store.New()
ctr = &fakeCounter{}
)
c, err := New("myStreamName",
WithClient(client),
WithCounter(ctr),
WithStore(cp),
WithLogger(&testLogger{t}),
)
if err != nil {
t.Fatalf("new consumer error: %v", err)
}
var (
ctx, cancel = context.WithCancel(context.Background())
res string
)
rand.Seed(time.Now().UnixNano())
var fn = func(r *Record) error {
res += string(r.Data)
time.Sleep(time.Duration(rand.Int()%100) * time.Millisecond)
if string(r.Data) == "shard4data" {
cancel()
}
return nil
}
if err := c.Scan(ctx, fn); err != nil {
t.Errorf("scan returned unexpected error %v", err)
}
if want := "shard1datashard3datashard4data"; res != want {
t.Errorf("callback error expected %s, got %s", want, res)
}
if val := ctr.Get(); val != 3 {
t.Errorf("counter error expected %d, got %d", 2, val)
}
val, err := cp.GetCheckpoint("myStreamName", "shard4data")
if err != nil && val != "shard4num" {
t.Errorf("checkout error expected %s, got %s", "shard4num", val)
}
}

7
go.mod
View file

@ -11,15 +11,14 @@ require (
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.5.0
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f
github.com/go-redis/redis/v9 v9.0.0-rc.2
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/go-redis/redis/v8 v8.0.0-beta.6
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/protobuf v1.5.2
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/lib/pq v1.7.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect
google.golang.org/protobuf v1.27.1 // indirect
)
go 1.13

217
go.sum
View file

@ -1,5 +1,13 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
@ -9,7 +17,6 @@ github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys=
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
@ -47,33 +54,42 @@ github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAm
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 h1:lxW5Q6K2IisyF5tlr6Ts0W4POGWQZco05MJjFmoeIHs=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-rendezvous v0.0.0-20200624174652-8d2f3be8b2d9 h1:h2Ul3Ym2iVZWMQGYmulVUJ4LSkBm1erp9mUkPwtMoLg=
github.com/dgryski/go-rendezvous v0.0.0-20200624174652-8d2f3be8b2d9/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0=
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-redis/redis/v8 v8.0.0-beta.6 h1:QeXAkG9L5cWJA+eJTBvhkftE7dwpJ0gbMYeBE2NxXS4=
github.com/go-redis/redis/v8 v8.0.0-beta.6/go.mod h1:g79Vpae8JMzg5qjk8BiwU9tK+HmU3iDVyS4UAJLFycI=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -85,19 +101,20 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
@ -105,6 +122,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -117,52 +135,31 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk=
github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8AyFNU9d0=
github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo=
github.com/onsi/ginkgo/v2 v2.5.0 h1:TRtrvv2vdQqzkwrQ1ke6vtXf7IK34RBUJafIy1wMwls=
github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc=
github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
@ -170,85 +167,71 @@ github.com/tj/go-buffer v1.0.1/go.mod h1:iyiJpfFcR2B9sXu7KvjbT9fpM4mOelRSDTbntVj
github.com/tj/go-elastic v0.0.0-20171221160941-36157cbbebc2/go.mod h1:WjeM0Oo1eNAjXGDx2yma7uG2XoyRZTq1uv3M/o7imD0=
github.com/tj/go-kinesis v0.0.0-20171128231115-08b17f58cb1b/go.mod h1:/yhzCV0xPfx6jb1bBgRFjl5lytqVqZXEaeqWP8lTEao=
github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e h1:oIpIX9VKxSCFrfjsKpluGbNPBGq9iNnT9crH781j9wY=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.opentelemetry.io/otel v0.7.0 h1:u43jukpwqR8EsyeJOMgrsUgZwVI1e1eVw7yuzRkD1l0=
go.opentelemetry.io/otel v0.7.0/go.mod h1:aZMyHG5TqDOXEgH2tyLiXSUKly1jT3yqE9PmrzIeCdo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc=
golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -257,22 +240,22 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -8,14 +8,7 @@ import (
// Group interface used to manage which shard to process
type Group interface {
Start(ctx context.Context, shardc chan types.Shard) error
Start(ctx context.Context, shardc chan types.Shard)
GetCheckpoint(streamName, shardID string) (string, error)
SetCheckpoint(streamName, shardID, sequenceNumber string) error
}
type CloseableGroup interface {
Group
// Allows shard processors to tell the group when the shard has been
// fully processed. Should be called only once per shardID.
CloseShard(ctx context.Context, shardID string) error
}

View file

@ -1,6 +0,0 @@
# Temporary Deaggregator
Upgrading to aws-sdk-go-v2 was blocked on a PR to introduce a new Deaggregator:
https://github.com/awslabs/kinesis-aggregation/pull/143/files
Once that PR is merged I'll remove this code and pull in the `awslabs/kinesis-aggregation` repo.

View file

@ -1,94 +0,0 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package deaggregator
import (
"crypto/md5"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/golang/protobuf/proto"
rec "github.com/awslabs/kinesis-aggregation/go/records"
)
// Magic File Header for a KPL Aggregated Record
var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2"))
const (
KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
DigestSize = 16 // MD5 Message size for protobuf.
)
// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf
// records within that array, returning an array of all records
func DeaggregateRecords(records []*types.Record) ([]*types.Record, error) {
var isAggregated bool
allRecords := make([]*types.Record, 0)
for _, record := range records {
isAggregated = true
var dataMagic string
var decodedDataNoMagic []byte
// Check if record is long enough to have magic file header
if len(record.Data) >= KplMagicLen {
dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
decodedDataNoMagic = record.Data[KplMagicLen:]
} else {
isAggregated = false
}
// Check if record has KPL Aggregate Record Magic Header and data length
// is correct size
if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
isAggregated = false
}
if isAggregated {
messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]
calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))
// Check protobuf MD5 hash matches MD5 sum of record
if messageDigest != calculatedDigest {
isAggregated = false
} else {
aggRecord := &rec.AggregatedRecord{}
err := proto.Unmarshal(messageData, aggRecord)
if err != nil {
return nil, err
}
partitionKeys := aggRecord.PartitionKeyTable
for _, aggrec := range aggRecord.Records {
newRecord := createUserRecord(partitionKeys, aggrec, record)
allRecords = append(allRecords, newRecord)
}
}
}
if !isAggregated {
allRecords = append(allRecords, record)
}
}
return allRecords, nil
}
// createUserRecord takes in the partitionKeys of the aggregated record, the individual
// deaggregated record, and the original aggregated record builds a kinesis.Record and
// returns it
func createUserRecord(partitionKeys []string, aggRec *rec.Record, record *types.Record) *types.Record {
partitionKey := partitionKeys[*aggRec.PartitionKeyIndex]
return &types.Record{
ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp,
Data: aggRec.Data,
EncryptionType: record.EncryptionType,
PartitionKey: &partitionKey,
SequenceNumber: record.SequenceNumber,
}
}

View file

@ -1,202 +0,0 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package deaggregator_test
import (
"crypto/md5"
"fmt"
"math/rand"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
rec "github.com/awslabs/kinesis-aggregation/go/records"
deagg "github.com/harlow/kinesis-consumer/internal/deaggregator"
)
// Generate an aggregate record in the correct AWS-specified format
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
func generateAggregateRecord(numRecords int) []byte {
aggr := &rec.AggregatedRecord{}
// Start with the magic header
aggRecord := []byte("\xf3\x89\x9a\xc2")
partKeyTable := make([]string, 0)
// Create proto record with numRecords length
for i := 0; i < numRecords; i++ {
var partKey uint64
var hashKey uint64
partKey = uint64(i)
hashKey = uint64(i) * uint64(10)
r := &rec.Record{
PartitionKeyIndex: &partKey,
ExplicitHashKeyIndex: &hashKey,
Data: []byte("Some test data string"),
Tags: make([]*rec.Tag, 0),
}
aggr.Records = append(aggr.Records, r)
partKeyVal := "test" + fmt.Sprint(i)
partKeyTable = append(partKeyTable, partKeyVal)
}
aggr.PartitionKeyTable = partKeyTable
// Marshal to protobuf record, create md5 sum from proto record
// and append both to aggRecord with magic header
data, _ := proto.Marshal(aggr)
md5Hash := md5.Sum(data)
aggRecord = append(aggRecord, data...)
aggRecord = append(aggRecord, md5Hash[:]...)
return aggRecord
}
// Generate a generic kinesis.Record using whatever []byte
// is passed in as the data (can be normal []byte or proto record)
func generateKinesisRecord(data []byte) *types.Record {
currentTime := time.Now()
encryptionType := types.EncryptionTypeNone
partitionKey := "1234"
sequenceNumber := "21269319989900637946712965403778482371"
return &types.Record{
ApproximateArrivalTimestamp: &currentTime,
Data: data,
EncryptionType: encryptionType,
PartitionKey: &partitionKey,
SequenceNumber: &sequenceNumber,
}
}
// This tests to make sure that the data is at least larger than the length
// of the magic header to do some array slicing with index out of bounds
func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
var err error
var kr *types.Record
krs := make([]*types.Record, 0, 1)
smallByte := []byte("No")
kr = generateKinesisRecord(smallByte)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}
// Small byte test, since this is not a deaggregated record, should return 1
// record in the array.
assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.")
}
// This function tests to make sure that the data starts with the correct magic header
// according to KPL aggregate documentation.
func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) {
var err error
var kr *types.Record
krs := make([]*types.Record, 0, 1)
min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
mismatchAggData := aggData[1:]
kr = generateKinesisRecord(mismatchAggData)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}
// A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2
// should return a single record.
assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.")
}
// This function tests that the DeaggregateRecords function returns the correct number of
// deaggregated records from a single aggregated record.
func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
var err error
var kr *types.Record
krs := make([]*types.Record, 0, 1)
min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
kr = generateKinesisRecord(aggData)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}
// Variable Length Aggregate Record test has aggregaterd records and should return
// n length.
assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars))
assert.Equal(t, n, len(dars), assertMsg)
}
// This function tests the length of the message after magic file header. If length is less than
// the digest size (16 bytes), it is not an aggregated record.
func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) {
var err error
var kr *types.Record
krs := make([]*types.Record, 0, 1)
min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
// Change size of proto message to 15
reducedAggData := aggData[:19]
kr = generateKinesisRecord(reducedAggData)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}
// A byte record with length less than 16 after the magic header should return
// a single record from DeaggregateRecords
assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.")
}
// This function tests the MD5 Sum at the end of the record by comparing MD5 sum
// at end of proto record with MD5 Sum of Proto message. If they do not match,
// it is not an aggregated record.
func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) {
var err error
var kr *types.Record
krs := make([]*types.Record, 0, 1)
min := 1
max := 10
n := rand.Intn(max-min) + min
aggData := generateAggregateRecord(n)
// Remove last byte from array to mismatch the MD5 sums
mismatchAggData := aggData[:len(aggData)-1]
kr = generateKinesisRecord(mismatchAggData)
krs = append(krs, kr)
dars, err := deagg.DeaggregateRecords(krs)
if err != nil {
panic(err)
}
// A byte record with an MD5 sum that does not match with the md5.Sum(record)
// will be marked as a non-aggregate record and return a single record
assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.")
}

View file

@ -1,6 +1,6 @@
package redis
import redis "github.com/go-redis/redis/v9"
import redis "github.com/go-redis/redis/v8"
// Option is used to override defaults when creating a new Redis checkpoint
type Option func(*Checkpoint)

View file

@ -5,7 +5,7 @@ import (
"fmt"
"os"
redis "github.com/go-redis/redis/v9"
redis "github.com/go-redis/redis/v8"
)
const localhost = "127.0.0.1:6379"

View file

@ -4,7 +4,7 @@ import (
"testing"
"github.com/alicebob/miniredis"
redis "github.com/go-redis/redis/v9"
redis "github.com/go-redis/redis/v8"
)
func Test_CheckpointOptions(t *testing.T) {