diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go new file mode 100644 index 0000000..6aa8905 --- /dev/null +++ b/internal/deaggregator/deaggregator.go @@ -0,0 +1,96 @@ +// Package deaggregator +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package deaggregator + +import ( + "crypto/md5" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/golang/protobuf/proto" + + rec "github.com/vmware/vmware-go-kcl/internal/records" +) + +// KplMagicHeader Magic File Header for a KPL Aggregated Record +var KplMagicHeader = fmt.Sprintf("%q", []byte("\xf3\x89\x9a\xc2")) + +const ( + KplMagicLen = 4 // Length of magic header for KPL Aggregate Record checking. + DigestSize = 16 // MD5 Message size for protobuf. +) + +// DeaggregateRecords takes an array of Kinesis records and expands any Protobuf +// records within that array, returning an array of all records +func DeaggregateRecords(records []types.Record) ([]types.Record, error) { + var isAggregated bool + allRecords := make([]types.Record, 0) + + for _, record := range records { + isAggregated = true + + var dataMagic string + var decodedDataNoMagic []byte + // Check if record is long enough to have magic file header + if len(record.Data) >= KplMagicLen { + dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen]) + decodedDataNoMagic = record.Data[KplMagicLen:] + } else { + isAggregated = false + } + + // Check if record has KPL Aggregate Record Magic Header and data length + // is correct size + if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize { + isAggregated = false + } + + if isAggregated { + messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:]) + messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize] + + calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData)) + + // Check protobuf MD5 hash matches MD5 sum of record + if messageDigest != calculatedDigest { + isAggregated = false + } else { + aggRecord := &rec.AggregatedRecord{} + err := proto.Unmarshal(messageData, aggRecord) + + if err != nil { + return nil, err + } + + partitionKeys := aggRecord.PartitionKeyTable + + for _, aggrec := range aggRecord.Records { + newRecord := createUserRecord(partitionKeys, aggrec, record) + allRecords = append(allRecords, newRecord) + } + } + } + + if !isAggregated { + allRecords = append(allRecords, record) + } + } + + return allRecords, nil +} + +// createUserRecord takes in the partitionKeys of the aggregated record, the individual +// deaggregated record, and the original aggregated record builds a kinesis.Record and +// returns it +func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.Record) types.Record { + partitionKey := partitionKeys[*aggRec.PartitionKeyIndex] + + return types.Record{ + ApproximateArrivalTimestamp: record.ApproximateArrivalTimestamp, + Data: aggRec.Data, + EncryptionType: record.EncryptionType, + PartitionKey: &partitionKey, + SequenceNumber: record.SequenceNumber, + } +} \ No newline at end of file diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go new file mode 100644 index 0000000..5bc8be6 --- /dev/null +++ b/internal/deaggregator/deaggregator_test.go @@ -0,0 +1,202 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package deaggregator_test + +import ( + "crypto/md5" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + + deagg "github.com/vmware/vmware-go-kcl/internal/deaggregator" + rec "github.com/vmware/vmware-go-kcl/internal/records" +) + +// Generate an aggregate record in the correct AWS-specified format +// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md +func generateAggregateRecord(numRecords int) []byte { + + aggr := &rec.AggregatedRecord{} + // Start with the magic header + aggRecord := []byte("\xf3\x89\x9a\xc2") + partKeyTable := make([]string, 0) + + // Create proto record with numRecords length + for i := 0; i < numRecords; i++ { + var partKey uint64 + var hashKey uint64 + partKey = uint64(i) + hashKey = uint64(i) * uint64(10) + r := &rec.Record{ + PartitionKeyIndex: &partKey, + ExplicitHashKeyIndex: &hashKey, + Data: []byte("Some test data string"), + Tags: make([]*rec.Tag, 0), + } + + aggr.Records = append(aggr.Records, r) + partKeyVal := "test" + fmt.Sprint(i) + partKeyTable = append(partKeyTable, partKeyVal) + } + + aggr.PartitionKeyTable = partKeyTable + // Marshal to protobuf record, create md5 sum from proto record + // and append both to aggRecord with magic header + data, _ := proto.Marshal(aggr) + md5Hash := md5.Sum(data) + aggRecord = append(aggRecord, data...) + aggRecord = append(aggRecord, md5Hash[:]...) + return aggRecord +} + +// Generate a generic kinesis.Record using whatever []byte +// is passed in as the data (can be normal []byte or proto record) +func generateKinesisRecord(data []byte) types.Record { + currentTime := time.Now() + encryptionType := types.EncryptionTypeNone + partitionKey := "1234" + sequenceNumber := "21269319989900637946712965403778482371" + return types.Record { + ApproximateArrivalTimestamp: ¤tTime, + Data: data, + EncryptionType: encryptionType, + PartitionKey: &partitionKey, + SequenceNumber: &sequenceNumber, + } +} + +// This tests to make sure that the data is at least larger than the length +// of the magic header to do some array slicing with index out of bounds +func TestSmallLengthReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { + var err error + var kr types.Record + + krs := make([]types.Record, 0, 1) + + smallByte := []byte("No") + kr = generateKinesisRecord(smallByte) + krs = append(krs, kr) + dars, err := deagg.DeaggregateRecords(krs) + if err != nil { + panic(err) + } + + // Small byte test, since this is not a deaggregated record, should return 1 + // record in the array. + assert.Equal(t, 1, len(dars), "Small Byte test should return length of 1.") +} + +// This function tests to make sure that the data starts with the correct magic header +// according to KPL aggregate documentation. +func TestNonMatchingMagicHeaderReturnsSingleRecord(t *testing.T) { + var err error + var kr types.Record + + krs := make([]types.Record, 0, 1) + + min := 1 + max := 10 + n := rand.Intn(max-min) + min + aggData := generateAggregateRecord(n) + mismatchAggData := aggData[1:] + kr = generateKinesisRecord(mismatchAggData) + + krs = append(krs, kr) + + dars, err := deagg.DeaggregateRecords(krs) + if err != nil { + panic(err) + } + + // A byte record with a magic header that does not match 0xF3 0x89 0x9A 0xC2 + // should return a single record. + assert.Equal(t, 1, len(dars), "Mismatch magic header test should return length of 1.") +} + +// This function tests that the DeaggregateRecords function returns the correct number of +// deaggregated records from a single aggregated record. +func TestVariableLengthRecordsReturnsCorrectNumberOfDeaggregatedRecords(t *testing.T) { + var err error + var kr types.Record + + krs := make([]types.Record, 0, 1) + + min := 1 + max := 10 + n := rand.Intn(max-min) + min + aggData := generateAggregateRecord(n) + kr = generateKinesisRecord(aggData) + krs = append(krs, kr) + + dars, err := deagg.DeaggregateRecords(krs) + if err != nil { + panic(err) + } + + // Variable Length Aggregate Record test has aggregaterd records and should return + // n length. + assertMsg := fmt.Sprintf("Variable Length Aggregate Record should return length %v.", len(dars)) + assert.Equal(t, n, len(dars), assertMsg) +} + +// This function tests the length of the message after magic file header. If length is less than +// the digest size (16 bytes), it is not an aggregated record. +func TestRecordAfterMagicHeaderWithLengthLessThanDigestSizeReturnsSingleRecord(t *testing.T) { + var err error + var kr types.Record + + krs := make([]types.Record, 0, 1) + + min := 1 + max := 10 + n := rand.Intn(max-min) + min + aggData := generateAggregateRecord(n) + // Change size of proto message to 15 + reducedAggData := aggData[:19] + kr = generateKinesisRecord(reducedAggData) + + krs = append(krs, kr) + + dars, err := deagg.DeaggregateRecords(krs) + if err != nil { + panic(err) + } + + // A byte record with length less than 16 after the magic header should return + // a single record from DeaggregateRecords + assert.Equal(t, 1, len(dars), "Digest size test should return length of 1.") +} + +// This function tests the MD5 Sum at the end of the record by comparing MD5 sum +// at end of proto record with MD5 Sum of Proto message. If they do not match, +// it is not an aggregated record. +func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { + var err error + var kr types.Record + + krs := make([]types.Record, 0, 1) + + min := 1 + max := 10 + n := rand.Intn(max-min) + min + aggData := generateAggregateRecord(n) + // Remove last byte from array to mismatch the MD5 sums + mismatchAggData := aggData[:len(aggData)-1] + kr = generateKinesisRecord(mismatchAggData) + + krs = append(krs, kr) + + dars, err := deagg.DeaggregateRecords(krs) + if err != nil { + panic(err) + } + + // A byte record with an MD5 sum that does not match with the md5.Sum(record) + // will be marked as a non-aggregate record and return a single record + assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") +} \ No newline at end of file diff --git a/internal/records/records.pb.go b/internal/records/records.pb.go new file mode 100644 index 0000000..689a1c8 --- /dev/null +++ b/internal/records/records.pb.go @@ -0,0 +1,215 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: records.proto + +package records + +import ( + fmt "fmt" + math "math" + + proto "github.com/golang/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type AggregatedRecord struct { + PartitionKeyTable []string `protobuf:"bytes,1,rep,name=partition_key_table,json=partitionKeyTable" json:"partition_key_table,omitempty"` + ExplicitHashKeyTable []string `protobuf:"bytes,2,rep,name=explicit_hash_key_table,json=explicitHashKeyTable" json:"explicit_hash_key_table,omitempty"` + Records []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AggregatedRecord) Reset() { *m = AggregatedRecord{} } +func (m *AggregatedRecord) String() string { return proto.CompactTextString(m) } +func (*AggregatedRecord) ProtoMessage() {} +func (*AggregatedRecord) Descriptor() ([]byte, []int) { + return fileDescriptor_6ae0159314830e16, []int{0} +} + +func (m *AggregatedRecord) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AggregatedRecord.Unmarshal(m, b) +} +func (m *AggregatedRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AggregatedRecord.Marshal(b, m, deterministic) +} +func (m *AggregatedRecord) XXX_Merge(src proto.Message) { + xxx_messageInfo_AggregatedRecord.Merge(m, src) +} +func (m *AggregatedRecord) XXX_Size() int { + return xxx_messageInfo_AggregatedRecord.Size(m) +} +func (m *AggregatedRecord) XXX_DiscardUnknown() { + xxx_messageInfo_AggregatedRecord.DiscardUnknown(m) +} + +var xxx_messageInfo_AggregatedRecord proto.InternalMessageInfo + +func (m *AggregatedRecord) GetPartitionKeyTable() []string { + if m != nil { + return m.PartitionKeyTable + } + return nil +} + +func (m *AggregatedRecord) GetExplicitHashKeyTable() []string { + if m != nil { + return m.ExplicitHashKeyTable + } + return nil +} + +func (m *AggregatedRecord) GetRecords() []*Record { + if m != nil { + return m.Records + } + return nil +} + +type Tag struct { + Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` + Value *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Tag) Reset() { *m = Tag{} } +func (m *Tag) String() string { return proto.CompactTextString(m) } +func (*Tag) ProtoMessage() {} +func (*Tag) Descriptor() ([]byte, []int) { + return fileDescriptor_6ae0159314830e16, []int{1} +} + +func (m *Tag) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Tag.Unmarshal(m, b) +} +func (m *Tag) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Tag.Marshal(b, m, deterministic) +} +func (m *Tag) XXX_Merge(src proto.Message) { + xxx_messageInfo_Tag.Merge(m, src) +} +func (m *Tag) XXX_Size() int { + return xxx_messageInfo_Tag.Size(m) +} +func (m *Tag) XXX_DiscardUnknown() { + xxx_messageInfo_Tag.DiscardUnknown(m) +} + +var xxx_messageInfo_Tag proto.InternalMessageInfo + +func (m *Tag) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Tag) GetValue() string { + if m != nil && m.Value != nil { + return *m.Value + } + return "" +} + +type Record struct { + PartitionKeyIndex *uint64 `protobuf:"varint,1,req,name=partition_key_index,json=partitionKeyIndex" json:"partition_key_index,omitempty"` + ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index,json=explicitHashKeyIndex" json:"explicit_hash_key_index,omitempty"` + Data []byte `protobuf:"bytes,3,req,name=data" json:"data,omitempty"` + Tags []*Tag `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} +func (*Record) Descriptor() ([]byte, []int) { + return fileDescriptor_6ae0159314830e16, []int{2} +} + +func (m *Record) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Record.Unmarshal(m, b) +} +func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Record.Marshal(b, m, deterministic) +} +func (m *Record) XXX_Merge(src proto.Message) { + xxx_messageInfo_Record.Merge(m, src) +} +func (m *Record) XXX_Size() int { + return xxx_messageInfo_Record.Size(m) +} +func (m *Record) XXX_DiscardUnknown() { + xxx_messageInfo_Record.DiscardUnknown(m) +} + +var xxx_messageInfo_Record proto.InternalMessageInfo + +func (m *Record) GetPartitionKeyIndex() uint64 { + if m != nil && m.PartitionKeyIndex != nil { + return *m.PartitionKeyIndex + } + return 0 +} + +func (m *Record) GetExplicitHashKeyIndex() uint64 { + if m != nil && m.ExplicitHashKeyIndex != nil { + return *m.ExplicitHashKeyIndex + } + return 0 +} + +func (m *Record) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Record) GetTags() []*Tag { + if m != nil { + return m.Tags + } + return nil +} + +func init() { + proto.RegisterType((*AggregatedRecord)(nil), "AggregatedRecord") + proto.RegisterType((*Tag)(nil), "Tag") + proto.RegisterType((*Record)(nil), "Record") +} + +func init() { proto.RegisterFile("records.proto", fileDescriptor_6ae0159314830e16) } + +var fileDescriptor_6ae0159314830e16 = []byte{ + // 245 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x8f, 0xc1, 0x4a, 0xc4, 0x30, + 0x10, 0x86, 0xc9, 0x26, 0xba, 0x74, 0x54, 0x58, 0xe3, 0x82, 0x39, 0xd6, 0x9e, 0x72, 0xb1, 0x07, + 0xc1, 0x07, 0xf0, 0xa6, 0x78, 0x0b, 0xbd, 0x97, 0x71, 0x3b, 0xa4, 0x61, 0xcb, 0xb6, 0xa4, 0x51, + 0xb6, 0xef, 0xa2, 0xef, 0x2a, 0x49, 0xdd, 0x45, 0x51, 0x6f, 0x93, 0xf9, 0xf9, 0x32, 0xff, 0x07, + 0x17, 0x9e, 0x36, 0xbd, 0x6f, 0xc6, 0x72, 0xf0, 0x7d, 0xe8, 0x8b, 0x77, 0x06, 0xab, 0x07, 0x6b, + 0x3d, 0x59, 0x0c, 0xd4, 0x98, 0x94, 0xc9, 0x12, 0xae, 0x06, 0xf4, 0xc1, 0x05, 0xd7, 0xef, 0xea, + 0x2d, 0x4d, 0x75, 0xc0, 0x97, 0x8e, 0x14, 0xcb, 0xb9, 0xce, 0xcc, 0xe5, 0x31, 0x7a, 0xa6, 0xa9, + 0x8a, 0x81, 0xbc, 0x87, 0x6b, 0xda, 0x0f, 0x9d, 0xdb, 0xb8, 0x50, 0xb7, 0x38, 0xb6, 0xdf, 0x98, + 0x45, 0x62, 0xd6, 0x87, 0xf8, 0x11, 0xc7, 0xf6, 0x88, 0xdd, 0xc0, 0xf2, 0xab, 0x8c, 0xe2, 0x39, + 0xd7, 0x67, 0x77, 0xcb, 0x72, 0x2e, 0x60, 0x0e, 0xfb, 0xe2, 0x16, 0x78, 0x85, 0x56, 0xae, 0x80, + 0x6f, 0x69, 0x52, 0x2c, 0x5f, 0xe8, 0xcc, 0xc4, 0x51, 0xae, 0xe1, 0xe4, 0x0d, 0xbb, 0xd7, 0x78, + 0x80, 0xe9, 0xcc, 0xcc, 0x8f, 0xe2, 0x83, 0xc1, 0xe9, 0x7f, 0x0e, 0x6e, 0xd7, 0xd0, 0x3e, 0x7d, + 0x21, 0x7e, 0x3a, 0x3c, 0xc5, 0xe0, 0x6f, 0x87, 0x99, 0x89, 0x27, 0xc4, 0x2f, 0x87, 0x19, 0x93, + 0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25, + 0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63, + 0x69, 0x7d, 0x01, 0x00, 0x00, +} \ No newline at end of file