since deaggregation package was upgraded to sdk v2 makes sense to use it.
https://github.com/awslabs/kinesis-aggregation/pull/143#issuecomment-953308464 Signed-off-by: Fabiano Graças <fabiano.gracas@faro.com>
This commit is contained in:
parent
562f0f0e6b
commit
f284ef978d
7 changed files with 8 additions and 515 deletions
|
|
@ -27,13 +27,13 @@ import (
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||||
|
deagg "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator"
|
||||||
|
|
||||||
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
||||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
|
||||||
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
|
kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
|
||||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
|
||||||
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
|
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
|
||||||
deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type shardConsumer interface {
|
type shardConsumer interface {
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -9,6 +9,7 @@ require (
|
||||||
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
|
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0
|
||||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
|
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0
|
||||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
|
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0
|
||||||
|
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407
|
||||||
github.com/golang/protobuf v1.5.2
|
github.com/golang/protobuf v1.5.2
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/prometheus/client_golang v1.11.0
|
github.com/prometheus/client_golang v1.11.0
|
||||||
|
|
|
||||||
5
go.sum
5
go.sum
|
|
@ -40,6 +40,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
|
||||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||||
|
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
|
||||||
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
|
github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs=
|
||||||
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
|
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
|
||||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
|
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI=
|
||||||
|
|
@ -66,14 +67,18 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+I
|
||||||
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
|
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
|
||||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
|
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg=
|
||||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
|
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I=
|
||||||
|
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
|
||||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
|
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E=
|
||||||
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
|
github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE=
|
||||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
|
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g=
|
||||||
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
|
github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0=
|
||||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
|
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc=
|
||||||
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
|
github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA=
|
||||||
|
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||||
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
|
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
|
||||||
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
|
||||||
|
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
|
||||||
|
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
|
||||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||||
|
|
|
||||||
|
|
@ -1,96 +0,0 @@
|
||||||
// Package deaggregator
|
|
||||||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
package deaggregator
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/md5"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
|
|
||||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
|
||||||
)
|
|
||||||
|
|
||||||
// KplMagicHeader Magic File Header for a KPL Aggregated Record
|
|
||||||
var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2"))
|
|
||||||
|
|
||||||
const (
|
|
||||||
KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking.
|
|
||||||
DigestSize = 16 // MD5 Message size for protobuf.
|
|
||||||
)
|
|
||||||
|
|
||||||
// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf
|
|
||||||
// records within that array, returning an array of all records
|
|
||||||
func DeaggregateRecords(records []types.Record) ([]types.Record, error) {
|
|
||||||
var isAggregated bool
|
|
||||||
allRecords := make([]types.Record, 0)
|
|
||||||
|
|
||||||
for _, record := range records {
|
|
||||||
isAggregated = true
|
|
||||||
|
|
||||||
var dataMagic string
|
|
||||||
var decodedDataNoMagic []byte
|
|
||||||
// Check if record is long enough to have magic file header
|
|
||||||
if len(record.Data) >= KplMagicLen {
|
|
||||||
dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
|
|
||||||
decodedDataNoMagic = record.Data[KplMagicLen:]
|
|
||||||
} else {
|
|
||||||
isAggregated = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if record has KPL Aggregate Record Magic Header and data length
|
|
||||||
// is correct size
|
|
||||||
if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
|
|
||||||
isAggregated = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if isAggregated {
|
|
||||||
messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
|
|
||||||
messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]
|
|
||||||
|
|
||||||
calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))
|
|
||||||
|
|
||||||
// Check protobuf MD5 hash matches MD5 sum of record
|
|
||||||
if messageDigest != calculatedDigest {
|
|
||||||
isAggregated = false
|
|
||||||
} else {
|
|
||||||
aggRecord := &rec.AggregatedRecord{}
|
|
||||||
err := proto.Unmarshal(messageData, aggRecord)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
partitionKeys := aggRecord.PartitionKeyTable
|
|
||||||
|
|
||||||
for _, aggrec := range aggRecord.Records {
|
|
||||||
newRecord := createUserRecord(partitionKeys, aggrec, record)
|
|
||||||
allRecords = append(allRecords, newRecord)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isAggregated {
|
|
||||||
allRecords = append(allRecords, record)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return allRecords, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// createUserRecord takes in the partitionKeys of the aggregated record, the individual
|
|
||||||
// deaggregated record, and the original aggregated record builds a kinesis.Record and
|
|
||||||
// returns it
|
|
||||||
func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record {
|
|
||||||
partitionKey := partitionKeys[*aggRec.PartitionKeyIndex]
|
|
||||||
|
|
||||||
return types.Record{
|
|
||||||
ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp,
|
|
||||||
Data: aggRec.Data,
|
|
||||||
EncryptionType: record.EncryptionType,
|
|
||||||
PartitionKey: &partitionKey,
|
|
||||||
SequenceNumber: record.SequenceNumber,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,202 +0,0 @@
|
||||||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
package deaggregator_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/md5"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator"
|
|
||||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Generate an aggregate record in the correct AWS-specified format
|
|
||||||
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
|
|
||||||
func generateAggregateRecord(numRecords int) []byte {
|
|
||||||
|
|
||||||
aggr := &rec.AggregatedRecord{}
|
|
||||||
// Start with the magic header
|
|
||||||
aggRecord := []byte("\xf3\x89\x9a\xc2")
|
|
||||||
partKeyTable := make([]string, 0)
|
|
||||||
|
|
||||||
// Create proto record with numRecords length
|
|
||||||
for i := 0; i < numRecords; i++ {
|
|
||||||
var partKey uint64
|
|
||||||
var hashKey uint64
|
|
||||||
partKey = uint64(i)
|
|
||||||
hashKey = uint64(i) * uint64(10)
|
|
||||||
r := &rec.Record{
|
|
||||||
PartitionKeyIndex: &partKey,
|
|
||||||
ExplicitHashKeyIndex: &hashKey,
|
|
||||||
Data: []byte("Some test data string"),
|
|
||||||
Tags: make([]*rec.Tag, 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
aggr.Records = append(aggr.Records, r)
|
|
||||||
partKeyVal := "test" + fmt.Sprint(i)
|
|
||||||
partKeyTable = append(partKeyTable, partKeyVal)
|
|
||||||
}
|
|
||||||
|
|
||||||
aggr.PartitionKeyTable = partKeyTable
|
|
||||||
// Marshal to protobuf record, create md5 sum from proto record
|
|
||||||
// and append both to aggRecord with magic header
|
|
||||||
data, _ := proto.Marshal(aggr)
|
|
||||||
md5Hash := md5.Sum(data)
|
|
||||||
aggRecord = append(aggRecord, data...)
|
|
||||||
aggRecord = append(aggRecord, md5Hash[:]...)
|
|
||||||
return aggRecord
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate a generic kinesis.Record using whatever []byte
|
|
||||||
// is passed in as the data (can be normal []byte or proto record)
|
|
||||||
func generateKinesisRecord(data []byte) types.Record {
|
|
||||||
currentTime := time.Now()
|
|
||||||
encryptionType := types.EncryptionTypeNone
|
|
||||||
partitionKey := "1234"
|
|
||||||
sequenceNumber := "21269319989900637946712965403778482371"
|
|
||||||
return types.Record{
|
|
||||||
ApproximateArrivalTimestamp: ¤tTime,
|
|
||||||
Data: data,
|
|
||||||
EncryptionType: encryptionType,
|
|
||||||
PartitionKey: &partitionKey,
|
|
||||||
SequenceNumber: &sequenceNumber,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This tests to make sure that the data is at least larger than the length
|
|
||||||
// of the magic header to do some array slicing with index out of bounds
|
|
||||||
func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var kr types.Record
|
|
||||||
|
|
||||||
krs := make([]types.Record, 0, 1)
|
|
||||||
|
|
||||||
smallByte := []byte("No")
|
|
||||||
kr = generateKinesisRecord(smallByte)
|
|
||||||
krs = append(krs, kr)
|
|
||||||
dars, err := deagg.DeaggregateRecords(krs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Small byte test, since this is not a deaggregated record, should return 1
|
|
||||||
// record in the array.
|
|
||||||
assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function tests to make sure that the data starts with the correct magic header
|
|
||||||
// according to KPL aggregate documentation.
|
|
||||||
func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var kr types.Record
|
|
||||||
|
|
||||||
krs := make([]types.Record, 0, 1)
|
|
||||||
|
|
||||||
min := 1
|
|
||||||
max := 10
|
|
||||||
n := rand.Intn(max-min) + min
|
|
||||||
aggData := generateAggregateRecord(n)
|
|
||||||
mismatchAggData := aggData[1:]
|
|
||||||
kr = generateKinesisRecord(mismatchAggData)
|
|
||||||
|
|
||||||
krs = append(krs, kr)
|
|
||||||
|
|
||||||
dars, err := deagg.DeaggregateRecords(krs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2
|
|
||||||
// should return a single record.
|
|
||||||
assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function tests that the DeaggregateRecords function returns the correct number of
|
|
||||||
// deaggregated records from a single aggregated record.
|
|
||||||
func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var kr types.Record
|
|
||||||
|
|
||||||
krs := make([]types.Record, 0, 1)
|
|
||||||
|
|
||||||
min := 1
|
|
||||||
max := 10
|
|
||||||
n := rand.Intn(max-min) + min
|
|
||||||
aggData := generateAggregateRecord(n)
|
|
||||||
kr = generateKinesisRecord(aggData)
|
|
||||||
krs = append(krs, kr)
|
|
||||||
|
|
||||||
dars, err := deagg.DeaggregateRecords(krs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Variable Length Aggregate Record test has aggregaterd records and should return
|
|
||||||
// n length.
|
|
||||||
assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars))
|
|
||||||
assert.Equal(t, n, len(dars), assertMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function tests the length of the message after magic file header. If length is less than
|
|
||||||
// the digest size (16 bytes), it is not an aggregated record.
|
|
||||||
func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var kr types.Record
|
|
||||||
|
|
||||||
krs := make([]types.Record, 0, 1)
|
|
||||||
|
|
||||||
min := 1
|
|
||||||
max := 10
|
|
||||||
n := rand.Intn(max-min) + min
|
|
||||||
aggData := generateAggregateRecord(n)
|
|
||||||
// Change size of proto message to 15
|
|
||||||
reducedAggData := aggData[:19]
|
|
||||||
kr = generateKinesisRecord(reducedAggData)
|
|
||||||
|
|
||||||
krs = append(krs, kr)
|
|
||||||
|
|
||||||
dars, err := deagg.DeaggregateRecords(krs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A byte record with length less than 16 after the magic header should return
|
|
||||||
// a single record from DeaggregateRecords
|
|
||||||
assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function tests the MD5 Sum at the end of the record by comparing MD5 sum
|
|
||||||
// at end of proto record with MD5 Sum of Proto message. If they do not match,
|
|
||||||
// it is not an aggregated record.
|
|
||||||
func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) {
|
|
||||||
var err error
|
|
||||||
var kr types.Record
|
|
||||||
|
|
||||||
krs := make([]types.Record, 0, 1)
|
|
||||||
|
|
||||||
min := 1
|
|
||||||
max := 10
|
|
||||||
n := rand.Intn(max-min) + min
|
|
||||||
aggData := generateAggregateRecord(n)
|
|
||||||
// Remove last byte from array to mismatch the MD5 sums
|
|
||||||
mismatchAggData := aggData[:len(aggData)-1]
|
|
||||||
kr = generateKinesisRecord(mismatchAggData)
|
|
||||||
|
|
||||||
krs = append(krs, kr)
|
|
||||||
|
|
||||||
dars, err := deagg.DeaggregateRecords(krs)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A byte record with an MD5 sum that does not match with the md5.Sum(record)
|
|
||||||
// will be marked as a non-aggregate record and return a single record
|
|
||||||
assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.")
|
|
||||||
}
|
|
||||||
|
|
@ -1,215 +0,0 @@
|
||||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
|
||||||
// source: records.proto
|
|
||||||
|
|
||||||
package records
|
|
||||||
|
|
||||||
import (
|
|
||||||
fmt "fmt"
|
|
||||||
math "math"
|
|
||||||
|
|
||||||
proto "github.com/golang/protobuf/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Reference imports to suppress errors if they are not otherwise used.
|
|
||||||
var _ = proto.Marshal
|
|
||||||
var _ = fmt.Errorf
|
|
||||||
var _ = math.Inf
|
|
||||||
|
|
||||||
// This is a compile-time assertion to ensure that this generated file
|
|
||||||
// is compatible with the proto package it is being compiled against.
|
|
||||||
// A compilation error at this line likely means your copy of the
|
|
||||||
// proto package needs to be updated.
|
|
||||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
|
||||||
|
|
||||||
type AggregatedRecord struct {
|
|
||||||
PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table,json=partitionKeyTable" json:"partition_key_table,omitempty"`
|
|
||||||
ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table,json=explicitHashKeyTable" json:"explicit_hash_key_table,omitempty"`
|
|
||||||
Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"`
|
|
||||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
|
||||||
XXX_unrecognized []byte `json:"-"`
|
|
||||||
XXX_sizecache int32 `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *AggregatedRecord) Reset() { *m = AggregatedRecord{} }
|
|
||||||
func (m *AggregatedRecord) String() string { return proto.CompactTextString(m) }
|
|
||||||
func (*AggregatedRecord) ProtoMessage() {}
|
|
||||||
func (*AggregatedRecord) Descriptor() ([]byte, []int) {
|
|
||||||
return fileDescriptor_6ae0159314830e16, []int{0}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *AggregatedRecord) XXX_Unmarshal(b []byte) error {
|
|
||||||
return xxx_messageInfo_AggregatedRecord.Unmarshal(m, b)
|
|
||||||
}
|
|
||||||
func (m *AggregatedRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
|
||||||
return xxx_messageInfo_AggregatedRecord.Marshal(b, m, deterministic)
|
|
||||||
}
|
|
||||||
func (m *AggregatedRecord) XXX_Merge(src proto.Message) {
|
|
||||||
xxx_messageInfo_AggregatedRecord.Merge(m, src)
|
|
||||||
}
|
|
||||||
func (m *AggregatedRecord) XXX_Size() int {
|
|
||||||
return xxx_messageInfo_AggregatedRecord.Size(m)
|
|
||||||
}
|
|
||||||
func (m *AggregatedRecord) XXX_DiscardUnknown() {
|
|
||||||
xxx_messageInfo_AggregatedRecord.DiscardUnknown(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
var xxx_messageInfo_AggregatedRecord proto.InternalMessageInfo
|
|
||||||
|
|
||||||
func (m *AggregatedRecord) GetPartitionKeyTable() []string {
|
|
||||||
if m != nil {
|
|
||||||
return m.PartitionKeyTable
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *AggregatedRecord) GetExplicitHashKeyTable() []string {
|
|
||||||
if m != nil {
|
|
||||||
return m.ExplicitHashKeyTable
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *AggregatedRecord) GetRecords() []*Record {
|
|
||||||
if m != nil {
|
|
||||||
return m.Records
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Tag struct {
|
|
||||||
Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
|
|
||||||
Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
|
|
||||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
|
||||||
XXX_unrecognized []byte `json:"-"`
|
|
||||||
XXX_sizecache int32 `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Tag) Reset() { *m = Tag{} }
|
|
||||||
func (m *Tag) String() string { return proto.CompactTextString(m) }
|
|
||||||
func (*Tag) ProtoMessage() {}
|
|
||||||
func (*Tag) Descriptor() ([]byte, []int) {
|
|
||||||
return fileDescriptor_6ae0159314830e16, []int{1}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Tag) XXX_Unmarshal(b []byte) error {
|
|
||||||
return xxx_messageInfo_Tag.Unmarshal(m, b)
|
|
||||||
}
|
|
||||||
func (m *Tag) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
|
||||||
return xxx_messageInfo_Tag.Marshal(b, m, deterministic)
|
|
||||||
}
|
|
||||||
func (m *Tag) XXX_Merge(src proto.Message) {
|
|
||||||
xxx_messageInfo_Tag.Merge(m, src)
|
|
||||||
}
|
|
||||||
func (m *Tag) XXX_Size() int {
|
|
||||||
return xxx_messageInfo_Tag.Size(m)
|
|
||||||
}
|
|
||||||
func (m *Tag) XXX_DiscardUnknown() {
|
|
||||||
xxx_messageInfo_Tag.DiscardUnknown(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
var xxx_messageInfo_Tag proto.InternalMessageInfo
|
|
||||||
|
|
||||||
func (m *Tag) GetKey() string {
|
|
||||||
if m != nil && m.Key != nil {
|
|
||||||
return *m.Key
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Tag) GetValue() string {
|
|
||||||
if m != nil && m.Value != nil {
|
|
||||||
return *m.Value
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
type Record struct {
|
|
||||||
PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index,json=partitionKeyIndex" json:"partition_key_index,omitempty"`
|
|
||||||
ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index,json=explicitHashKeyIndex" json:"explicit_hash_key_index,omitempty"`
|
|
||||||
Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"`
|
|
||||||
Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"`
|
|
||||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
|
||||||
XXX_unrecognized []byte `json:"-"`
|
|
||||||
XXX_sizecache int32 `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Record) Reset() { *m = Record{} }
|
|
||||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
|
||||||
func (*Record) ProtoMessage() {}
|
|
||||||
func (*Record) Descriptor() ([]byte, []int) {
|
|
||||||
return fileDescriptor_6ae0159314830e16, []int{2}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Record) XXX_Unmarshal(b []byte) error {
|
|
||||||
return xxx_messageInfo_Record.Unmarshal(m, b)
|
|
||||||
}
|
|
||||||
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
|
||||||
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
|
|
||||||
}
|
|
||||||
func (m *Record) XXX_Merge(src proto.Message) {
|
|
||||||
xxx_messageInfo_Record.Merge(m, src)
|
|
||||||
}
|
|
||||||
func (m *Record) XXX_Size() int {
|
|
||||||
return xxx_messageInfo_Record.Size(m)
|
|
||||||
}
|
|
||||||
func (m *Record) XXX_DiscardUnknown() {
|
|
||||||
xxx_messageInfo_Record.DiscardUnknown(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
var xxx_messageInfo_Record proto.InternalMessageInfo
|
|
||||||
|
|
||||||
func (m *Record) GetPartitionKeyIndex() uint64 {
|
|
||||||
if m != nil && m.PartitionKeyIndex != nil {
|
|
||||||
return *m.PartitionKeyIndex
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Record) GetExplicitHashKeyIndex() uint64 {
|
|
||||||
if m != nil && m.ExplicitHashKeyIndex != nil {
|
|
||||||
return *m.ExplicitHashKeyIndex
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Record) GetData() []byte {
|
|
||||||
if m != nil {
|
|
||||||
return m.Data
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Record) GetTags() []*Tag {
|
|
||||||
if m != nil {
|
|
||||||
return m.Tags
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
proto.RegisterType((*AggregatedRecord)(nil), "AggregatedRecord")
|
|
||||||
proto.RegisterType((*Tag)(nil), "Tag")
|
|
||||||
proto.RegisterType((*Record)(nil), "Record")
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() { proto.RegisterFile("records.proto", fileDescriptor_6ae0159314830e16) }
|
|
||||||
|
|
||||||
var fileDescriptor_6ae0159314830e16 = []byte{
|
|
||||||
// 245 bytes of a gzipped FileDescriptorProto
|
|
||||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0xc1, 0x4a, 0xc4, 0x30,
|
|
||||||
0x10, 0x86, 0xc9, 0x26, 0xba, 0x74, 0x54, 0x58, 0xe3, 0x82, 0x39, 0xd6, 0x9e, 0x72, 0xb1, 0x07,
|
|
||||||
0xc1, 0x07, 0xf0, 0xa6, 0x78, 0x0b, 0xbd, 0x97, 0x71, 0x3b, 0xa4, 0x61, 0xcb, 0xb6, 0xa4, 0x51,
|
|
||||||
0xb6, 0xef, 0xa2, 0xef, 0x2a, 0x49, 0xdd, 0x45, 0x51, 0x6f, 0x93, 0xf9, 0xf9, 0x32, 0xff, 0x07,
|
|
||||||
0x17, 0x9e, 0x36, 0xbd, 0x6f, 0xc6, 0x72, 0xf0, 0x7d, 0xe8, 0x8b, 0x77, 0x06, 0xab, 0x07, 0x6b,
|
|
||||||
0x3d, 0x59, 0x0c, 0xd4, 0x98, 0x94, 0xc9, 0x12, 0xae, 0x06, 0xf4, 0xc1, 0x05, 0xd7, 0xef, 0xea,
|
|
||||||
0x2d, 0x4d, 0x75, 0xc0, 0x97, 0x8e, 0x14, 0xcb, 0xb9, 0xce, 0xcc, 0xe5, 0x31, 0x7a, 0xa6, 0xa9,
|
|
||||||
0x8a, 0x81, 0xbc, 0x87, 0x6b, 0xda, 0x0f, 0x9d, 0xdb, 0xb8, 0x50, 0xb7, 0x38, 0xb6, 0xdf, 0x98,
|
|
||||||
0x45, 0x62, 0xd6, 0x87, 0xf8, 0x11, 0xc7, 0xf6, 0x88, 0xdd, 0xc0, 0xf2, 0xab, 0x8c, 0xe2, 0x39,
|
|
||||||
0xd7, 0x67, 0x77, 0xcb, 0x72, 0x2e, 0x60, 0x0e, 0xfb, 0xe2, 0x16, 0x78, 0x85, 0x56, 0xae, 0x80,
|
|
||||||
0x6f, 0x69, 0x52, 0x2c, 0x5f, 0xe8, 0xcc, 0xc4, 0x51, 0xae, 0xe1, 0xe4, 0x0d, 0xbb, 0xd7, 0x78,
|
|
||||||
0x80, 0xe9, 0xcc, 0xcc, 0x8f, 0xe2, 0x83, 0xc1, 0xe9, 0x7f, 0x0e, 0x6e, 0xd7, 0xd0, 0x3e, 0x7d,
|
|
||||||
0x21, 0x7e, 0x3a, 0x3c, 0xc5, 0xe0, 0x6f, 0x87, 0x99, 0x89, 0x27, 0xc4, 0x2f, 0x87, 0x19, 0x93,
|
|
||||||
0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25,
|
|
||||||
0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63,
|
|
||||||
0x69, 0x7d, 0x01, 0x00, 0x00,
|
|
||||||
}
|
|
||||||
|
|
@ -32,10 +32,10 @@ import (
|
||||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||||
|
rec "github.com/awslabs/kinesis-aggregation/go/v2/records"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
|
||||||
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils"
|
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils"
|
||||||
rec "github.com/vmware/vmware-go-kcl-v2/internal/records"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
|
const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue