#7 imports awslabs aggregator in favor of local one
This commit is contained in:
parent
1918e43f3e
commit
eaf4defe57
6 changed files with 20 additions and 326 deletions
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
|
||||
"github.com/harlow/kinesis-consumer/internal/deaggregator"
|
||||
"github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
|
||||
)
|
||||
|
||||
// Record wraps the record returned from the Kinesis library and
|
||||
|
|
|
|||
11
go.mod
11
go.mod
|
|
@ -12,13 +12,11 @@ require (
|
|||
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.13
|
||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486
|
||||
github.com/go-sql-driver/mysql v1.8.1
|
||||
github.com/golang/protobuf v1.5.4
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/redis/go-redis/v9 v9.5.1
|
||||
github.com/stretchr/testify v1.9.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -38,12 +36,11 @@ require (
|
|||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
|
||||
github.com/aws/smithy-go v1.20.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/gomodule/redigo v2.0.0+incompatible // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/testify v1.9.0 // indirect
|
||||
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
google.golang.org/protobuf v1.26.0 // indirect
|
||||
)
|
||||
|
|
|
|||
30
go.sum
30
go.sum
|
|
@ -12,7 +12,7 @@ github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDw
|
|||
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
|
||||
github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys=
|
||||
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
|
||||
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
|
||||
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to=
|
||||
|
|
@ -41,6 +41,7 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 h1:6tayE
|
|||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6/go.mod h1:qVNb/9IOVsLCZh0x2lnagrBwQ9fxajUpXS7OZfIsKn0=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4 h1:Oe8awBiS/iitcsRJB5+DHa3iCxoA0KwJJf0JNrYMINY=
|
||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.4/go.mod h1:RCZCSFbieSgNG1RKegO26opXV4EXyef/vNBVJsUyHuw=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w=
|
||||
|
|
@ -49,10 +50,11 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2K
|
|||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw=
|
||||
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||
github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q=
|
||||
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486 h1:fBy4wQzC3T5S6F1o1uTYeR8WF1MIL7GSsPYjzabOwtA=
|
||||
github.com/awslabs/kinesis-aggregation/go v0.0.0-20230808105340-e631fe742486/go.mod h1:CQGhQ8Rf1WF5Ke8XuUjcd4PRb+mFTjzKR/pm3EWKaQw=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486 h1:266Pq6JfxdphziJ1LiqU68OJrKiTxyF8hbiceQWX3Cs=
|
||||
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20230808105340-e631fe742486/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
|
||||
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
|
|
@ -71,21 +73,22 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
|
|||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
|
||||
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
|
||||
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
||||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
|
||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
|
||||
|
|
@ -93,10 +96,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
|
|||
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
|
||||
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
|
|
@ -121,8 +122,8 @@ github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h
|
|||
github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
|
||||
|
|
@ -139,7 +140,6 @@ golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
@ -151,10 +151,10 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
|||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
# Temporary Deaggregator
|
||||
|
||||
Upgrading to aws-sdk-go-v2 was blocked on a PR to introduce a new Deaggregator:
|
||||
https://github.com/awslabs/kinesis-aggregation/pull/143/files
|
||||
|
||||
Once that PR is merged I'll remove this code and pull in the `awslabs/kinesis-aggregation` repo.
|
||||
|
|
@ -1,94 +0,0 @@
|
|||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package deaggregator
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
rec "github.com/awslabs/kinesis-aggregation/go/records"
|
||||
)
|
||||
|
||||
// KplMagicHeader Magic File Header for a KPL Aggregated Record
|
||||
var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2"))
|
||||
|
||||
const (
|
||||
KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
|
||||
DigestSize = 16 // MD5 Message size for protobuf.
|
||||
)
|
||||
|
||||
// DisaggregatedRecords takes an array of Kinesis records and expands any Protobuf
|
||||
// records within that array, returning an array of all records
|
||||
func DisaggregatedRecords(records []*types.Record) ([]*types.Record, error) {
|
||||
var isAggregated bool
|
||||
allRecords := make([]*types.Record, 0)
|
||||
for _, record := range records {
|
||||
isAggregated = true
|
||||
|
||||
var dataMagic string
|
||||
var decodedDataNoMagic []byte
|
||||
// Check if record is long enough to have magic file header
|
||||
if len(record.Data) >= KplMagicLen {
|
||||
dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
|
||||
decodedDataNoMagic = record.Data[KplMagicLen:]
|
||||
} else {
|
||||
isAggregated = false
|
||||
}
|
||||
|
||||
// Check if record has KPL Aggregate Record Magic Header and data length
|
||||
// is correct size
|
||||
if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
|
||||
isAggregated = false
|
||||
}
|
||||
|
||||
if isAggregated {
|
||||
messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
|
||||
messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]
|
||||
|
||||
calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))
|
||||
|
||||
// Check protobuf MD5 hash matches MD5 sum of record
|
||||
if messageDigest != calculatedDigest {
|
||||
isAggregated = false
|
||||
} else {
|
||||
aggRecord := &rec.AggregatedRecord{}
|
||||
err := proto.Unmarshal(messageData, aggRecord)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partitionKeys := aggRecord.PartitionKeyTable
|
||||
|
||||
for _, aggrec := range aggRecord.Records {
|
||||
newRecord := createUserRecord(partitionKeys, aggrec, record)
|
||||
allRecords = append(allRecords, newRecord)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !isAggregated {
|
||||
allRecords = append(allRecords, record)
|
||||
}
|
||||
}
|
||||
|
||||
return allRecords, nil
|
||||
}
|
||||
|
||||
// createUserRecord takes in the partitionKeys of the aggregated record, the individual
|
||||
// disaggregated record, and the original aggregated record builds a kinesis.Record and
|
||||
// returns it
|
||||
func createUserRecord(partitionKeys []string, aggRec *rec.Record, record *types.Record) *types.Record {
|
||||
partitionKey := partitionKeys[*aggRec.PartitionKeyIndex]
|
||||
|
||||
return &types.Record{
|
||||
ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp,
|
||||
Data: aggRec.Data,
|
||||
EncryptionType: record.EncryptionType,
|
||||
PartitionKey: &partitionKey,
|
||||
SequenceNumber: record.SequenceNumber,
|
||||
}
|
||||
}
|
||||
|
|
@ -1,203 +0,0 @@
|
|||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package deaggregator_test
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
rec "github.com/awslabs/kinesis-aggregation/go/records"
|
||||
|
||||
deagg "github.com/harlow/kinesis-consumer/internal/deaggregator"
|
||||
)
|
||||
|
||||
// Generate an aggregate record in the correct AWS-specified format
|
||||
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
|
||||
func generateAggregateRecord(numRecords int) []byte {
|
||||
|
||||
aggr := &rec.AggregatedRecord{}
|
||||
// Start with the magic header
|
||||
aggRecord := []byte("\xf3\x89\x9a\xc2")
|
||||
partKeyTable := make([]string, 0)
|
||||
|
||||
// Create proto record with numRecords length
|
||||
for i := 0; i < numRecords; i++ {
|
||||
var partKey uint64
|
||||
var hashKey uint64
|
||||
partKey = uint64(i)
|
||||
hashKey = uint64(i) * uint64(10)
|
||||
r := &rec.Record{
|
||||
PartitionKeyIndex: &partKey,
|
||||
ExplicitHashKeyIndex: &hashKey,
|
||||
Data: []byte("Some test data string"),
|
||||
Tags: make([]*rec.Tag, 0),
|
||||
}
|
||||
|
||||
aggr.Records = append(aggr.Records, r)
|
||||
partKeyVal := "test" + fmt.Sprint(i)
|
||||
partKeyTable = append(partKeyTable, partKeyVal)
|
||||
}
|
||||
|
||||
aggr.PartitionKeyTable = partKeyTable
|
||||
// Marshal to protobuf record, create md5 sum from proto record
|
||||
// and append both to aggRecord with magic header
|
||||
data, _ := proto.Marshal(aggr)
|
||||
md5Hash := md5.Sum(data)
|
||||
aggRecord = append(aggRecord, data...)
|
||||
aggRecord = append(aggRecord, md5Hash[:]...)
|
||||
return aggRecord
|
||||
}
|
||||
|
||||
// Generate a generic kinesis.Record using whatever []byte
|
||||
// is passed in as the data (can be normal []byte or proto record)
|
||||
func generateKinesisRecord(data []byte) *types.Record {
|
||||
currentTime := time.Now()
|
||||
encryptionType := types.EncryptionTypeNone
|
||||
partitionKey := "1234"
|
||||
sequenceNumber := "21269319989900637946712965403778482371"
|
||||
return &types.Record{
|
||||
ApproximateArrivalTimestamp: ¤tTime,
|
||||
Data: data,
|
||||
EncryptionType: encryptionType,
|
||||
PartitionKey: &partitionKey,
|
||||
SequenceNumber: &sequenceNumber,
|
||||
}
|
||||
}
|
||||
|
||||
// This tests to make sure that the data is at least larger than the length
|
||||
// of the magic header to do some array slicing with index out of bounds
|
||||
func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
||||
var err error
|
||||
var kr *types.Record
|
||||
|
||||
krs := make([]*types.Record, 0, 1)
|
||||
|
||||
smallByte := []byte("No")
|
||||
kr = generateKinesisRecord(smallByte)
|
||||
krs = append(krs, kr)
|
||||
dars, err := deagg.DisaggregatedRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Small byte test, since this is not a deaggregated record, should return 1
|
||||
// record in the array.
|
||||
assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests to make sure that the data starts with the correct magic header
|
||||
// according to KPL aggregate documentation.
|
||||
func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr *types.Record
|
||||
|
||||
krs := make([]*types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
mismatchAggData := aggData[1:]
|
||||
kr = generateKinesisRecord(mismatchAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DisaggregatedRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2
|
||||
// should return a single record.
|
||||
assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests that the DisaggregatedRecords function returns the correct number of
|
||||
// deaggregated records from a single aggregated record.
|
||||
func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
||||
var err error
|
||||
var kr *types.Record
|
||||
|
||||
krs := make([]*types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
kr = generateKinesisRecord(aggData)
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DisaggregatedRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Variable Length Aggregate Record test has aggregaterd records and should return
|
||||
// n length.
|
||||
assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars))
|
||||
assert.Equal(t, n, len(dars), assertMsg)
|
||||
}
|
||||
|
||||
// This function tests the length of the message after magic file header. If length is less than
|
||||
// the digest size (16 bytes), it is not an aggregated record.
|
||||
func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr *types.Record
|
||||
|
||||
krs := make([]*types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
// Change size of proto message to 15
|
||||
reducedAggData := aggData[:19]
|
||||
kr = generateKinesisRecord(reducedAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DisaggregatedRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with length less than 16 after the magic header should return
|
||||
// a single record from DisaggregatedRecords
|
||||
assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.")
|
||||
}
|
||||
|
||||
// This function tests the MD5 Sum at the end of the record by comparing MD5 sum
|
||||
// at end of proto record with MD5 Sum of Proto message. If they do not match,
|
||||
// it is not an aggregated record.
|
||||
func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) {
|
||||
var err error
|
||||
var kr *types.Record
|
||||
|
||||
krs := make([]*types.Record, 0, 1)
|
||||
|
||||
min := 1
|
||||
max := 10
|
||||
n := rand.Intn(max-min) + min
|
||||
aggData := generateAggregateRecord(n)
|
||||
// Remove last byte from array to mismatch the MD5 sums
|
||||
mismatchAggData := aggData[:len(aggData)-1]
|
||||
kr = generateKinesisRecord(mismatchAggData)
|
||||
|
||||
krs = append(krs, kr)
|
||||
|
||||
dars, err := deagg.DisaggregatedRecords(krs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// A byte record with an MD5 sum that does not match with the md5.Sum(record)
|
||||
// will be marked as a non-aggregate record and return a single record
|
||||
assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.")
|
||||
}
|
||||
Loading…
Reference in a new issue