diff --git a/consumer.go b/consumer.go index 27ff421..813910d 100644 --- a/consumer.go +++ b/consumer.go @@ -14,7 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/harlow/kinesis-consumer/internal/deaggregator" + "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator" ) // Record wraps the record returned from the Kinesis library and diff --git a/go.mod b/go.mod index 328dc26..a7f9f88 100644 --- a/go.mod +++ b/go.mod @@ -12,13 +12,11 @@ require ( github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.13 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1 github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4 - github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486 + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486 github.com/go-sql-driver/mysql v1.8.1 - github.com/golang/protobuf v1.5.4 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.5.1 - github.com/stretchr/testify v1.9.0 ) require ( @@ -38,12 +36,11 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/aws/smithy-go v1.20.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect - google.golang.org/protobuf v1.33.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + google.golang.org/protobuf v1.26.0 // indirect ) diff --git a/go.sum b/go.sum index e27ca96..2bfbadf 100644 --- a/go.sum +++ b/go.sum @@ -12,7 +12,7 @@ github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDw github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= @@ -41,6 +41,7 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 h1:6tayE github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6/go.mod h1:qVNb/9IOVsLCZh0x2lnagrBwQ9fxajUpXS7OZfIsKn0= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4 h1:Oe8awBiS/iitcsRJB5+DHa3iCxoA0KwJJf0JNrYMINY= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4/go.mod h1:RCZCSFbieSgNG1RKegO26opXV4EXyef/vNBVJsUyHuw= github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= @@ -49,10 +50,11 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2K github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486 h1:fBy4wQzC3T5S6F1o1uTYeR8WF1MIL7GSsPYjzabOwtA= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486/go.mod h1:CQGhQ8Rf1WF5Ke8XuUjcd4PRb+mFTjzKR/pm3EWKaQw= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486 h1:266Pq6JfxdphziJ1LiqU68OJrKiTxyF8hbiceQWX3Cs= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= @@ -71,21 +73,22 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= -github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -93,10 +96,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= @@ -121,8 +122,8 @@ github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= @@ -139,7 +140,6 @@ golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8U golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -151,10 +151,10 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/internal/deaggregator/README.md b/internal/deaggregator/README.md deleted file mode 100644 index ce474ad..0000000 --- a/internal/deaggregator/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# Temporary Deaggregator - -Upgrading to aws-sdk-go-v2 was blocked on a PR to introduce a new Deaggregator: -https://github.com/awslabs/kinesis-aggregation/pull/143/files - -Once that PR is merged I'll remove this code and pull in the `awslabs/kinesis-aggregation` repo. \ No newline at end of file diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go deleted file mode 100644 index b2395f8..0000000 --- a/internal/deaggregator/deaggregator.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator - -import ( - "crypto/md5" - "fmt" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - - rec "github.com/awslabs/kinesis-aggregation/go/records" -) - -// KplMagicHeader Magic File Header for a KPL Aggregated Record -var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2")) - -const ( - KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. - DigestSize = 16 // MD5 Message size for protobuf. -) - -// DisaggregatedRecords takes an array of Kinesis records and expands any Protobuf -// records within that array, returning an array of all records -func DisaggregatedRecords(records []*types.Record) ([]*types.Record, error) { - var isAggregated bool - allRecords := make([]*types.Record, 0) - for _, record := range records { - isAggregated = true - - var dataMagic string - var decodedDataNoMagic []byte - // Check if record is long enough to have magic file header - if len(record.Data) >= KplMagicLen { - dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen]) - decodedDataNoMagic = record.Data[KplMagicLen:] - } else { - isAggregated = false - } - - // Check if record has KPL Aggregate Record Magic Header and data length - // is correct size - if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize { - isAggregated = false - } - - if isAggregated { - messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:]) - messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize] - - calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData)) - - // Check protobuf MD5 hash matches MD5 sum of record - if messageDigest != calculatedDigest { - isAggregated = false - } else { - aggRecord := &rec.AggregatedRecord{} - err := proto.Unmarshal(messageData, aggRecord) - - if err != nil { - return nil, err - } - - partitionKeys := aggRecord.PartitionKeyTable - - for _, aggrec := range aggRecord.Records { - newRecord := createUserRecord(partitionKeys, aggrec, record) - allRecords = append(allRecords, newRecord) - } - } - } - - if !isAggregated { - allRecords = append(allRecords, record) - } - } - - return allRecords, nil -} - -// createUserRecord takes in the partitionKeys of the aggregated record, the individual -// disaggregated record, and the original aggregated record builds a kinesis.Record and -// returns it -func createUserRecord(partitionKeys []string, aggRec *rec.Record, record *types.Record) *types.Record { - partitionKey := partitionKeys[*aggRec.PartitionKeyIndex] - - return &types.Record{ - ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp, - Data: aggRec.Data, - EncryptionType: record.EncryptionType, - PartitionKey: &partitionKey, - SequenceNumber: record.SequenceNumber, - } -} diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go deleted file mode 100644 index 1f2a037..0000000 --- a/internal/deaggregator/deaggregator_test.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator_test - -import ( - "crypto/md5" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - - rec "github.com/awslabs/kinesis-aggregation/go/records" - - deagg "github.com/harlow/kinesis-consumer/internal/deaggregator" -) - -// Generate an aggregate record in the correct AWS-specified format -// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md -func generateAggregateRecord(numRecords int) []byte { - - aggr := &rec.AggregatedRecord{} - // Start with the magic header - aggRecord := []byte("\xf3\x89\x9a\xc2") - partKeyTable := make([]string, 0) - - // Create proto record with numRecords length - for i := 0; i < numRecords; i++ { - var partKey uint64 - var hashKey uint64 - partKey = uint64(i) - hashKey = uint64(i) * uint64(10) - r := &rec.Record{ - PartitionKeyIndex: &partKey, - ExplicitHashKeyIndex: &hashKey, - Data: []byte("Some test data string"), - Tags: make([]*rec.Tag, 0), - } - - aggr.Records = append(aggr.Records, r) - partKeyVal := "test" + fmt.Sprint(i) - partKeyTable = append(partKeyTable, partKeyVal) - } - - aggr.PartitionKeyTable = partKeyTable - // Marshal to protobuf record, create md5 sum from proto record - // and append both to aggRecord with magic header - data, _ := proto.Marshal(aggr) - md5Hash := md5.Sum(data) - aggRecord = append(aggRecord, data...) - aggRecord = append(aggRecord, md5Hash[:]...) - return aggRecord -} - -// Generate a generic kinesis.Record using whatever []byte -// is passed in as the data (can be normal []byte or proto record) -func generateKinesisRecord(data []byte) *types.Record { - currentTime := time.Now() - encryptionType := types.EncryptionTypeNone - partitionKey := "1234" - sequenceNumber := "21269319989900637946712965403778482371" - return &types.Record{ - ApproximateArrivalTimestamp: ¤tTime, - Data: data, - EncryptionType: encryptionType, - PartitionKey: &partitionKey, - SequenceNumber: &sequenceNumber, - } -} - -// This tests to make sure that the data is at least larger than the length -// of the magic header to do some array slicing with index out of bounds -func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - smallByte := []byte("No") - kr = generateKinesisRecord(smallByte) - krs = append(krs, kr) - dars, err := deagg.DisaggregatedRecords(krs) - if err != nil { - panic(err) - } - - // Small byte test, since this is not a deaggregated record, should return 1 - // record in the array. - assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.") -} - -// This function tests to make sure that the data starts with the correct magic header -// according to KPL aggregate documentation. -func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - mismatchAggData := aggData[1:] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DisaggregatedRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2 - // should return a single record. - assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.") -} - -// This function tests that the DisaggregatedRecords function returns the correct number of -// deaggregated records from a single aggregated record. -func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - kr = generateKinesisRecord(aggData) - krs = append(krs, kr) - - dars, err := deagg.DisaggregatedRecords(krs) - if err != nil { - panic(err) - } - - // Variable Length Aggregate Record test has aggregaterd records and should return - // n length. - assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars)) - assert.Equal(t, n, len(dars), assertMsg) -} - -// This function tests the length of the message after magic file header. If length is less than -// the digest size (16 bytes), it is not an aggregated record. -func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Change size of proto message to 15 - reducedAggData := aggData[:19] - kr = generateKinesisRecord(reducedAggData) - - krs = append(krs, kr) - - dars, err := deagg.DisaggregatedRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with length less than 16 after the magic header should return - // a single record from DisaggregatedRecords - assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.") -} - -// This function tests the MD5 Sum at the end of the record by comparing MD5 sum -// at end of proto record with MD5 Sum of Proto message. If they do not match, -// it is not an aggregated record. -func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Remove last byte from array to mismatch the MD5 sums - mismatchAggData := aggData[:len(aggData)-1] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DisaggregatedRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with an MD5 sum that does not match with the md5.Sum(record) - // will be marked as a non-aggregate record and return a single record - assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") -}