From f284ef978de90d351bfb4373a7a70a910de8fe6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabiano=20Grac=CC=A7as?= Date: Fri, 7 Jan 2022 02:27:18 +0100 Subject: [PATCH] since deaggregation package was upgraded to sdk v2 makes sense to use it. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/awslabs/kinesis-aggregation/pull/143#issuecomment-953308464 Signed-off-by: Fabiano Graças --- clientlibrary/worker/common-shard-consumer.go | 2 +- go.mod | 1 + go.sum | 5 + internal/deaggregator/deaggregator.go | 96 -------- internal/deaggregator/deaggregator_test.go | 202 ---------------- internal/records/records.pb.go | 215 ------------------ test/record_publisher_test.go | 2 +- 7 files changed, 8 insertions(+), 515 deletions(-) delete mode 100644 internal/deaggregator/deaggregator.go delete mode 100644 internal/deaggregator/deaggregator_test.go delete mode 100644 internal/records/records.pb.go diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index d384145..32b91b1 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -27,13 +27,13 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + deagg "github.com/awslabs/kinesis-aggregation/go/v2/deaggregator" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" - deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator" ) type shardConsumer interface { diff --git a/go.mod b/go.mod index 529c1a1..9a56ba8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index a366614..f03f679 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= @@ -66,14 +67,18 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+I github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 h1:s47dGRX/fBy9s/Zculav/cyqRhkMKsE/5hjg6rWAH6E= github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0/go.mod h1:B1x58TfECuYHFX/bga902rUvMqQu9C/v2XiCi2GZZXE= github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 h1:E4fxAg/UE8a6yiLZYv8/EP0uXKPPRImiMau4ift6S/g= github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGXyn22c1USYxA/2KyvoeDY0= github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc= github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA= +github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go deleted file mode 100644 index 10e4dfb..0000000 --- a/internal/deaggregator/deaggregator.go +++ /dev/null @@ -1,96 +0,0 @@ -// Package deaggregator -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator - -import ( - "crypto/md5" - "fmt" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" -) - -// KplMagicHeader Magic File Header for a KPL Aggregated Record -var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2")) - -const ( - KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. - DigestSize = 16 // MD5 Message size for protobuf. -) - -// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf -// records within that array, returning an array of all records -func DeaggregateRecords(records []types.Record) ([]types.Record, error) { - var isAggregated bool - allRecords := make([]types.Record, 0) - - for _, record := range records { - isAggregated = true - - var dataMagic string - var decodedDataNoMagic []byte - // Check if record is long enough to have magic file header - if len(record.Data) >= KplMagicLen { - dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen]) - decodedDataNoMagic = record.Data[KplMagicLen:] - } else { - isAggregated = false - } - - // Check if record has KPL Aggregate Record Magic Header and data length - // is correct size - if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize { - isAggregated = false - } - - if isAggregated { - messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:]) - messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize] - - calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData)) - - // Check protobuf MD5 hash matches MD5 sum of record - if messageDigest != calculatedDigest { - isAggregated = false - } else { - aggRecord := &rec.AggregatedRecord{} - err := proto.Unmarshal(messageData, aggRecord) - - if err != nil { - return nil, err - } - - partitionKeys := aggRecord.PartitionKeyTable - - for _, aggrec := range aggRecord.Records { - newRecord := createUserRecord(partitionKeys, aggrec, record) - allRecords = append(allRecords, newRecord) - } - } - } - - if !isAggregated { - allRecords = append(allRecords, record) - } - } - - return allRecords, nil -} - -// createUserRecord takes in the partitionKeys of the aggregated record, the individual -// deaggregated record, and the original aggregated record builds a kinesis.Record and -// returns it -func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record { - partitionKey := partitionKeys[*aggRec.PartitionKeyIndex] - - return types.Record{ - ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp, - Data: aggRec.Data, - EncryptionType: record.EncryptionType, - PartitionKey: &partitionKey, - SequenceNumber: record.SequenceNumber, - } -} diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go deleted file mode 100644 index 3ba2793..0000000 --- a/internal/deaggregator/deaggregator_test.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package deaggregator_test - -import ( - "crypto/md5" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - - deagg "github.com/vmware/vmware-go-kcl-v2/internal/deaggregator" - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" -) - -// Generate an aggregate record in the correct AWS-specified format -// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md -func generateAggregateRecord(numRecords int) []byte { - - aggr := &rec.AggregatedRecord{} - // Start with the magic header - aggRecord := []byte("\xf3\x89\x9a\xc2") - partKeyTable := make([]string, 0) - - // Create proto record with numRecords length - for i := 0; i < numRecords; i++ { - var partKey uint64 - var hashKey uint64 - partKey = uint64(i) - hashKey = uint64(i) * uint64(10) - r := &rec.Record{ - PartitionKeyIndex: &partKey, - ExplicitHashKeyIndex: &hashKey, - Data: []byte("Some test data string"), - Tags: make([]*rec.Tag, 0), - } - - aggr.Records = append(aggr.Records, r) - partKeyVal := "test" + fmt.Sprint(i) - partKeyTable = append(partKeyTable, partKeyVal) - } - - aggr.PartitionKeyTable = partKeyTable - // Marshal to protobuf record, create md5 sum from proto record - // and append both to aggRecord with magic header - data, _ := proto.Marshal(aggr) - md5Hash := md5.Sum(data) - aggRecord = append(aggRecord, data...) - aggRecord = append(aggRecord, md5Hash[:]...) - return aggRecord -} - -// Generate a generic kinesis.Record using whatever []byte -// is passed in as the data (can be normal []byte or proto record) -func generateKinesisRecord(data []byte) types.Record { - currentTime := time.Now() - encryptionType := types.EncryptionTypeNone - partitionKey := "1234" - sequenceNumber := "21269319989900637946712965403778482371" - return types.Record{ - ApproximateArrivalTimestamp: ¤tTime, - Data: data, - EncryptionType: encryptionType, - PartitionKey: &partitionKey, - SequenceNumber: &sequenceNumber, - } -} - -// This tests to make sure that the data is at least larger than the length -// of the magic header to do some array slicing with index out of bounds -func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - smallByte := []byte("No") - kr = generateKinesisRecord(smallByte) - krs = append(krs, kr) - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Small byte test, since this is not a deaggregated record, should return 1 - // record in the array. - assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.") -} - -// This function tests to make sure that the data starts with the correct magic header -// according to KPL aggregate documentation. -func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - mismatchAggData := aggData[1:] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2 - // should return a single record. - assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.") -} - -// This function tests that the DeaggregateRecords function returns the correct number of -// deaggregated records from a single aggregated record. -func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - kr = generateKinesisRecord(aggData) - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // Variable Length Aggregate Record test has aggregaterd records and should return - // n length. - assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars)) - assert.Equal(t, n, len(dars), assertMsg) -} - -// This function tests the length of the message after magic file header. If length is less than -// the digest size (16 bytes), it is not an aggregated record. -func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Change size of proto message to 15 - reducedAggData := aggData[:19] - kr = generateKinesisRecord(reducedAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with length less than 16 after the magic header should return - // a single record from DeaggregateRecords - assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.") -} - -// This function tests the MD5 Sum at the end of the record by comparing MD5 sum -// at end of proto record with MD5 Sum of Proto message. If they do not match, -// it is not an aggregated record. -func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { - var err error - var kr types.Record - - krs := make([]types.Record, 0, 1) - - min := 1 - max := 10 - n := rand.Intn(max-min) + min - aggData := generateAggregateRecord(n) - // Remove last byte from array to mismatch the MD5 sums - mismatchAggData := aggData[:len(aggData)-1] - kr = generateKinesisRecord(mismatchAggData) - - krs = append(krs, kr) - - dars, err := deagg.DeaggregateRecords(krs) - if err != nil { - panic(err) - } - - // A byte record with an MD5 sum that does not match with the md5.Sum(record) - // will be marked as a non-aggregate record and return a single record - assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") -} diff --git a/internal/records/records.pb.go b/internal/records/records.pb.go deleted file mode 100644 index 89abba5..0000000 --- a/internal/records/records.pb.go +++ /dev/null @@ -1,215 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: records.proto - -package records - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type AggregatedRecord struct { - PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table,json=partitionKeyTable" json:"partition_key_table,omitempty"` - ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table,json=explicitHashKeyTable" json:"explicit_hash_key_table,omitempty"` - Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *AggregatedRecord) Reset() { *m = AggregatedRecord{} } -func (m *AggregatedRecord) String() string { return proto.CompactTextString(m) } -func (*AggregatedRecord) ProtoMessage() {} -func (*AggregatedRecord) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{0} -} - -func (m *AggregatedRecord) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_AggregatedRecord.Unmarshal(m, b) -} -func (m *AggregatedRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_AggregatedRecord.Marshal(b, m, deterministic) -} -func (m *AggregatedRecord) XXX_Merge(src proto.Message) { - xxx_messageInfo_AggregatedRecord.Merge(m, src) -} -func (m *AggregatedRecord) XXX_Size() int { - return xxx_messageInfo_AggregatedRecord.Size(m) -} -func (m *AggregatedRecord) XXX_DiscardUnknown() { - xxx_messageInfo_AggregatedRecord.DiscardUnknown(m) -} - -var xxx_messageInfo_AggregatedRecord proto.InternalMessageInfo - -func (m *AggregatedRecord) GetPartitionKeyTable() []string { - if m != nil { - return m.PartitionKeyTable - } - return nil -} - -func (m *AggregatedRecord) GetExplicitHashKeyTable() []string { - if m != nil { - return m.ExplicitHashKeyTable - } - return nil -} - -func (m *AggregatedRecord) GetRecords() []*Record { - if m != nil { - return m.Records - } - return nil -} - -type Tag struct { - Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` - Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Tag) Reset() { *m = Tag{} } -func (m *Tag) String() string { return proto.CompactTextString(m) } -func (*Tag) ProtoMessage() {} -func (*Tag) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{1} -} - -func (m *Tag) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Tag.Unmarshal(m, b) -} -func (m *Tag) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Tag.Marshal(b, m, deterministic) -} -func (m *Tag) XXX_Merge(src proto.Message) { - xxx_messageInfo_Tag.Merge(m, src) -} -func (m *Tag) XXX_Size() int { - return xxx_messageInfo_Tag.Size(m) -} -func (m *Tag) XXX_DiscardUnknown() { - xxx_messageInfo_Tag.DiscardUnknown(m) -} - -var xxx_messageInfo_Tag proto.InternalMessageInfo - -func (m *Tag) GetKey() string { - if m != nil && m.Key != nil { - return *m.Key - } - return "" -} - -func (m *Tag) GetValue() string { - if m != nil && m.Value != nil { - return *m.Value - } - return "" -} - -type Record struct { - PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index,json=partitionKeyIndex" json:"partition_key_index,omitempty"` - ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index,json=explicitHashKeyIndex" json:"explicit_hash_key_index,omitempty"` - Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"` - Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Record) Reset() { *m = Record{} } -func (m *Record) String() string { return proto.CompactTextString(m) } -func (*Record) ProtoMessage() {} -func (*Record) Descriptor() ([]byte, []int) { - return fileDescriptor_6ae0159314830e16, []int{2} -} - -func (m *Record) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Record.Unmarshal(m, b) -} -func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Record.Marshal(b, m, deterministic) -} -func (m *Record) XXX_Merge(src proto.Message) { - xxx_messageInfo_Record.Merge(m, src) -} -func (m *Record) XXX_Size() int { - return xxx_messageInfo_Record.Size(m) -} -func (m *Record) XXX_DiscardUnknown() { - xxx_messageInfo_Record.DiscardUnknown(m) -} - -var xxx_messageInfo_Record proto.InternalMessageInfo - -func (m *Record) GetPartitionKeyIndex() uint64 { - if m != nil && m.PartitionKeyIndex != nil { - return *m.PartitionKeyIndex - } - return 0 -} - -func (m *Record) GetExplicitHashKeyIndex() uint64 { - if m != nil && m.ExplicitHashKeyIndex != nil { - return *m.ExplicitHashKeyIndex - } - return 0 -} - -func (m *Record) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *Record) GetTags() []*Tag { - if m != nil { - return m.Tags - } - return nil -} - -func init() { - proto.RegisterType((*AggregatedRecord)(nil), "AggregatedRecord") - proto.RegisterType((*Tag)(nil), "Tag") - proto.RegisterType((*Record)(nil), "Record") -} - -func init() { proto.RegisterFile("records.proto", fileDescriptor_6ae0159314830e16) } - -var fileDescriptor_6ae0159314830e16 = []byte{ - // 245 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0xc1, 0x4a, 0xc4, 0x30, - 0x10, 0x86, 0xc9, 0x26, 0xba, 0x74, 0x54, 0x58, 0xe3, 0x82, 0x39, 0xd6, 0x9e, 0x72, 0xb1, 0x07, - 0xc1, 0x07, 0xf0, 0xa6, 0x78, 0x0b, 0xbd, 0x97, 0x71, 0x3b, 0xa4, 0x61, 0xcb, 0xb6, 0xa4, 0x51, - 0xb6, 0xef, 0xa2, 0xef, 0x2a, 0x49, 0xdd, 0x45, 0x51, 0x6f, 0x93, 0xf9, 0xf9, 0x32, 0xff, 0x07, - 0x17, 0x9e, 0x36, 0xbd, 0x6f, 0xc6, 0x72, 0xf0, 0x7d, 0xe8, 0x8b, 0x77, 0x06, 0xab, 0x07, 0x6b, - 0x3d, 0x59, 0x0c, 0xd4, 0x98, 0x94, 0xc9, 0x12, 0xae, 0x06, 0xf4, 0xc1, 0x05, 0xd7, 0xef, 0xea, - 0x2d, 0x4d, 0x75, 0xc0, 0x97, 0x8e, 0x14, 0xcb, 0xb9, 0xce, 0xcc, 0xe5, 0x31, 0x7a, 0xa6, 0xa9, - 0x8a, 0x81, 0xbc, 0x87, 0x6b, 0xda, 0x0f, 0x9d, 0xdb, 0xb8, 0x50, 0xb7, 0x38, 0xb6, 0xdf, 0x98, - 0x45, 0x62, 0xd6, 0x87, 0xf8, 0x11, 0xc7, 0xf6, 0x88, 0xdd, 0xc0, 0xf2, 0xab, 0x8c, 0xe2, 0x39, - 0xd7, 0x67, 0x77, 0xcb, 0x72, 0x2e, 0x60, 0x0e, 0xfb, 0xe2, 0x16, 0x78, 0x85, 0x56, 0xae, 0x80, - 0x6f, 0x69, 0x52, 0x2c, 0x5f, 0xe8, 0xcc, 0xc4, 0x51, 0xae, 0xe1, 0xe4, 0x0d, 0xbb, 0xd7, 0x78, - 0x80, 0xe9, 0xcc, 0xcc, 0x8f, 0xe2, 0x83, 0xc1, 0xe9, 0x7f, 0x0e, 0x6e, 0xd7, 0xd0, 0x3e, 0x7d, - 0x21, 0x7e, 0x3a, 0x3c, 0xc5, 0xe0, 0x6f, 0x87, 0x99, 0x89, 0x27, 0xc4, 0x2f, 0x87, 0x19, 0x93, - 0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25, - 0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63, - 0x69, 0x7d, 0x01, 0x00, 0x00, -} diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go index 9e02ef3..cfd142b 100644 --- a/test/record_publisher_test.go +++ b/test/record_publisher_test.go @@ -32,10 +32,10 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + rec "github.com/awslabs/kinesis-aggregation/go/v2/records" "github.com/golang/protobuf/proto" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils" - rec "github.com/vmware/vmware-go-kcl-v2/internal/records" ) const specstr = `{"name":"kube-qQyhk","networking":{"containerNetworkCidr":"10.2.0.0/16"},"orgName":"BVT-Org-cLQch","projectName":"project-tDSJd","serviceLevel":"DEVELOPER","size":{"count":1},"version":"1.8.1-4"}`