Add support for aggregated records (#127)
Add config option for aggregated records and deaggregation on records in ScanShard This PR adds an option to consume aggregated records.
This commit is contained in:
parent
e60d217333
commit
799ccf2d40
4 changed files with 23 additions and 1 deletions
14
consumer.go
14
consumer.go
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
|
||||
)
|
||||
|
||||
// Record wraps the record returned from the Kinesis library and
|
||||
|
|
@ -78,6 +79,7 @@ type Consumer struct {
|
|||
store Store
|
||||
scanInterval time.Duration
|
||||
maxRecords int64
|
||||
isAggregated bool
|
||||
}
|
||||
|
||||
// ScanFunc is the type of the function called for each message read
|
||||
|
|
@ -180,7 +182,17 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
|||
}
|
||||
} else {
|
||||
// loop over records, call callback func
|
||||
for _, r := range resp.Records {
|
||||
var records []*kinesis.Record
|
||||
var err error
|
||||
if c.isAggregated {
|
||||
records, err = deaggregator.DeaggregateRecords(resp.Records)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
records = resp.Records
|
||||
}
|
||||
for _, r := range records {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/alicebob/miniredis v2.5.0+incompatible
|
||||
github.com/apex/log v1.6.0
|
||||
github.com/aws/aws-sdk-go v1.33.7
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0
|
||||
github.com/go-redis/redis/v8 v8.0.0-beta.6
|
||||
github.com/go-sql-driver/mysql v1.5.0
|
||||
github.com/gomodule/redigo v2.0.0+incompatible // indirect
|
||||
|
|
|
|||
3
go.sum
3
go.sum
|
|
@ -17,9 +17,12 @@ github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY
|
|||
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
|
||||
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
|
||||
github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys=
|
||||
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.33.7 h1:vOozL5hmWHHriRviVTQnUwz8l05RS0rehmEFymI+/x8=
|
||||
github.com/aws/aws-sdk-go v1.33.7/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0 h1:D97PNkeea5i2Sbq844BdbULqI5pv7yQw4thPwqEX504=
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
|
||||
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
|
||||
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
|
||||
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
|
||||
|
|
|
|||
|
|
@ -73,3 +73,9 @@ func WithMaxRecords(n int64) Option {
|
|||
c.maxRecords = n
|
||||
}
|
||||
}
|
||||
|
||||
func WithAggregation(a bool) Option {
|
||||
return func(c *Consumer) {
|
||||
c.isAggregated = a
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue