From 799ccf2d40169819b48c8a8a9b1e1a3987e89559 Mon Sep 17 00:00:00 2001 From: James Regovich Date: Tue, 13 Oct 2020 22:41:18 -0500 Subject: [PATCH] Add support for aggregated records (#127) Add config option for aggregated records and deaggregation on records in ScanShard This PR adds an option to consume aggregated records. --- consumer.go | 14 +++++++++++++- go.mod | 1 + go.sum | 3 +++ options.go | 6 ++++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 763b0e2..9662a18 100644 --- a/consumer.go +++ b/consumer.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + "github.com/awslabs/kinesis-aggregation/go/deaggregator" ) // Record wraps the record returned from the Kinesis library and @@ -78,6 +79,7 @@ type Consumer struct { store Store scanInterval time.Duration maxRecords int64 + isAggregated bool } // ScanFunc is the type of the function called for each message read @@ -180,7 +182,17 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e } } else { // loop over records, call callback func - for _, r := range resp.Records { + var records []*kinesis.Record + var err error + if c.isAggregated { + records, err = deaggregator.DeaggregateRecords(resp.Records) + if err != nil { + return err + } + } else { + records = resp.Records + } + for _, r := range records { select { case <-ctx.Done(): return nil diff --git a/go.mod b/go.mod index b838c66..69c654b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/alicebob/miniredis v2.5.0+incompatible github.com/apex/log v1.6.0 github.com/aws/aws-sdk-go v1.33.7 + github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0 github.com/go-redis/redis/v8 v8.0.0-beta.6 github.com/go-sql-driver/mysql v1.5.0 github.com/gomodule/redigo v2.0.0+incompatible // indirect diff --git a/go.sum b/go.sum index 7a52515..1a74f9d 100644 --- a/go.sum +++ b/go.sum @@ -17,9 +17,12 @@ github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= +github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.33.7 h1:vOozL5hmWHHriRviVTQnUwz8l05RS0rehmEFymI+/x8= github.com/aws/aws-sdk-go v1.33.7/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0 h1:D97PNkeea5i2Sbq844BdbULqI5pv7yQw4thPwqEX504= +github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= diff --git a/options.go b/options.go index b8248aa..093547b 100644 --- a/options.go +++ b/options.go @@ -73,3 +73,9 @@ func WithMaxRecords(n int64) Option { c.maxRecords = n } } + +func WithAggregation(a bool) Option { + return func(c *Consumer) { + c.isAggregated = a + } +}