diff --git a/consumer.go b/consumer.go index bff0b21..3608674 100644 --- a/consumer.go +++ b/consumer.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/harlow/kinesis-consumer/internal/deaggregator" + "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator" ) // Record wraps the record returned from the Kinesis library and diff --git a/go.mod b/go.mod index 8020e77..b901f9c 100644 --- a/go.mod +++ b/go.mod @@ -7,20 +7,16 @@ require ( github.com/apex/log v1.6.0 github.com/aws/aws-sdk-go-v2 v1.11.2 github.com/aws/aws-sdk-go-v2/config v1.6.1 - github.com/aws/aws-sdk-go-v2/credentials v1.3.3 // indirect - github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0 // indirect - github.com/aws/aws-sdk-go-v2/service/dynamodb v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.3.3 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.5.0 github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0 - github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/go-redis/redis/v8 v8.0.0-beta.6 github.com/go-sql-driver/mysql v1.5.0 - github.com/golang/protobuf v1.5.2 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect - github.com/google/go-cmp v0.5.6 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/lib/pq v1.7.0 github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.7.0 // indirect github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e // indirect google.golang.org/protobuf v1.27.1 // indirect ) diff --git a/go.sum b/go.sum index 93a4f52..9080bf7 100644 --- a/go.sum +++ b/go.sum @@ -17,14 +17,8 @@ github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= -github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.20.6 h1:kmy4Gvdlyez1fV4kw5RYxZzWKVyuHZHgPWeU/YvRsV4= github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.40.27 h1:8fWW0CpmBZ8WWduNwl4vE9t07nMYFrhAsUHjPj81qUM= -github.com/aws/aws-sdk-go v1.40.27/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= -github.com/aws/aws-sdk-go-v2 v1.8.1 h1:GcFgQl7MsBygmeeqXyV1ivrTEmsVz/rdFJaTcltG9ag= github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0= -github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= @@ -50,24 +44,18 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.1.0 h1:QCPbs github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.1.0/go.mod h1:enkU5tq2HoXY+ZMiQprgF3Q83T3PbO77E83yXXzRZWE= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3 h1:VxFCgxsqWe7OThOwJ5IpFX3xrObtuIH9Hg/NW7oot1Y= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3/go.mod h1:7gcsONBmFoCcKrAqrm95trrMd2+C/ReYKP7Vfu8yHHA= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.5.3 h1:ScnzrRDDZsARgwEXFBuE8cQ4rkm2yuaA72Gad4NNVK8= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.5.3/go.mod h1:1C+FM8Sk3+UoI/svy0J9CS+e4PbJ/qsXxlE9k4/nQKI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0 h1:hb+NupVMUzINGUCfDs2+YqMkWKu47dBIQHpulM0XWh4= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 h1:K2gCnGvAASpz+jqP9iyr+F/KNjmTYf8aWOtTQzhmZ5w= github.com/aws/aws-sdk-go-v2/service/sso v1.3.3/go.mod h1:Jgw5O+SK7MZ2Yi9Yvzb4PggAPYaFSliiQuWR0hNjexk= github.com/aws/aws-sdk-go-v2/service/sts v1.6.2 h1:l504GWCoQi1Pk68vSUFGLmDIEMzRfVGNgLakDK+Uj58= github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5Dcdy0jp6LhHqg55rjClkM= -github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg= github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0 h1:D97PNkeea5i2Sbq844BdbULqI5pv7yQw4thPwqEX504= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20200810181507-d352038274c0/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 h1:lxW5Q6K2IisyF5tlr6Ts0W4POGWQZco05MJjFmoeIHs= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= @@ -107,7 +95,6 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= @@ -170,7 +157,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -209,7 +195,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -224,13 +209,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY= golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -256,7 +237,6 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/internal/deaggregator/README.md b/internal/deaggregator/README.md deleted file mode 100644 index ce474ad..0000000 --- a/internal/deaggregator/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# Temporary Deaggregator - -Upgrading to aws-sdk-go-v2 was blocked on a PR to introduce a new Deaggregator: -https://github.com/awslabs/kinesis-aggregation/pull/143/files - -Once that PR is merged I'll remove this code and pull in the `awslabs/kinesis-aggregation` repo. \ No newline at end of file diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go deleted file mode 100644 index 94782f1..0000000 --- a/internal/deaggregator/deaggregator.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator - -import ( - "crypto/md5" - "fmt" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - - rec "github.com/awslabs/kinesis-aggregation/go/records" -) - -// Magic File Header for a KPL Aggregated Record -var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2")) - -const ( - KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. - DigestSize = 16 // MD5 Message size for protobuf. -) - -// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf -// records within that array, returning an array of all records -func DeaggregateRecords(records []*types.Record) ([]*types.Record, error) { - var isAggregated bool - allRecords := make([]*types.Record, 0) - for _, record := range records { - isAggregated = true - - var dataMagic string - var decodedDataNoMagic []byte - // Check if record is long enough to have magic file header - if len(record.Data) >= KplMagicLen { - dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen]) - decodedDataNoMagic = record.Data[KplMagicLen:] - } else { - isAggregated = false - } - - // Check if record has KPL Aggregate Record Magic Header and data length - // is correct size - if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize { - isAggregated = false - } - - if isAggregated { - messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:]) - messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize] - - calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData)) - - // Check protobuf MD5 hash matches MD5 sum of record - if messageDigest != calculatedDigest { - isAggregated = false - } else { - aggRecord := &rec.AggregatedRecord{} - err := proto.Unmarshal(messageData, aggRecord) - - if err != nil { - return nil, err - } - - partitionKeys := aggRecord.PartitionKeyTable - - for _, aggrec := range aggRecord.Records { - newRecord := createUserRecord(partitionKeys, aggrec, record) - allRecords = append(allRecords, newRecord) - } - } - } - - if !isAggregated { - allRecords = append(allRecords, record) - } - } - - return allRecords, nil -} - -// createUserRecord takes in the partitionKeys of the aggregated record, the individual -// deaggregated record, and the original aggregated record builds a kinesis.Record and -// returns it -func createUserRecord(partitionKeys []string, aggRec *rec.Record, record *types.Record) *types.Record { - partitionKey := partitionKeys[*aggRec.PartitionKeyIndex] - - return &types.Record{ - ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp, - Data: aggRec.Data, - EncryptionType: record.EncryptionType, - PartitionKey: &partitionKey, - SequenceNumber: record.SequenceNumber, - } -} diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go deleted file mode 100644 index d1c33f9..0000000 --- a/internal/deaggregator/deaggregator_test.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator_test - -import ( - "crypto/md5" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - - rec "github.com/awslabs/kinesis-aggregation/go/records" - deagg "github.com/harlow/kinesis-consumer/internal/deaggregator" -) - -// Generate an aggregate record in the correct AWS-specified format -// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md -func generateAggregateRecord(numRecords int) []byte { - - aggr := &rec.AggregatedRecord{} - // Start with the magic header - aggRecord := []byte("\xf3\x89\x9a\xc2") - partKeyTable := make([]string, 0) - - // Create proto record with numRecords length - for i := 0; i < numRecords; i++ { - var partKey uint64 - var hashKey uint64 - partKey = uint64(i) - hashKey = uint64(i) * uint64(10) - r := &rec.Record{ - PartitionKeyIndex: &partKey, - ExplicitHashKeyIndex: &hashKey, - Data: []byte("Some test data string"), - Tags: make([]*rec.Tag, 0), - } - - aggr.Records = append(aggr.Records, r) - partKeyVal := "test" + fmt.Sprint(i) - partKeyTable = append(partKeyTable, partKeyVal) - } - - aggr.PartitionKeyTable = partKeyTable - // Marshal to protobuf record, create md5 sum from proto record - // and append both to aggRecord with magic header - data, _ := proto.Marshal(aggr) - md5Hash := md5.Sum(data) - aggRecord = append(aggRecord, data...) - aggRecord = append(aggRecord, md5Hash[:]...) - return aggRecord -} - -// Generate a generic kinesis.Record using whatever []byte -// is passed in as the data (can be normal []byte or proto record) -func generateKinesisRecord(data []byte) *types.Record { - currentTime := time.Now() - encryptionType := types.EncryptionTypeNone - partitionKey := "1234" - sequenceNumber := "21269319989900637946712965403778482371" - return &types.Record{ - ApproximateArrivalTimestamp: ¤tTime, - Data: data, - EncryptionType: encryptionType, - PartitionKey: &partitionKey, - SequenceNumber: &sequenceNumber, - } -} - -// This tests to make sure that the data is at least larger than the length -// of the magic header to do some array slicing with index out of bounds -func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - smallByte := []byte("No") - kr = generateKinesisRecord(smallByte) - krs = append(krs, kr) - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Small byte test, since this is not a deaggregated record, should return 1 - // record in the array. - assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.") -} - -// This function tests to make sure that the data starts with the correct magic header -// according to KPL aggregate documentation. -func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - mismatchAggData := aggData[1:] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2 - // should return a single record. - assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.") -} - -// This function tests that the DeaggregateRecords function returns the correct number of -// deaggregated records from a single aggregated record. -func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - kr = generateKinesisRecord(aggData) - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Variable Length Aggregate Record test has aggregaterd records and should return - // n length. - assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars)) - assert.Equal(t, n, len(dars), assertMsg) -} - -// This function tests the length of the message after magic file header. If length is less than -// the digest size (16 bytes), it is not an aggregated record. -func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Change size of proto message to 15 - reducedAggData := aggData[:19] - kr = generateKinesisRecord(reducedAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with length less than 16 after the magic header should return - // a single record from DeaggregateRecords - assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.") -} - -// This function tests the MD5 Sum at the end of the record by comparing MD5 sum -// at end of proto record with MD5 Sum of Proto message. If they do not match, -// it is not an aggregated record. -func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { - var err error - var kr *types.Record - - krs := make([]*types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Remove last byte from array to mismatch the MD5 sums - mismatchAggData := aggData[:len(aggData)-1] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with an MD5 sum that does not match with the md5.Sum(record) - // will be marked as a non-aggregate record and return a single record - assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") -}