Add functionality for handling KPL aggregates and zlib

Adds a lowest-common-denominator function, KPLDeaggregate, for
handling records that might be KPL aggregated. Also adds a function,
DeaggregateAndSplitIfNecessary, to wrap the existing functionality of
SplitMessageIfNecessary with KPL deaggreation.

These functions are handy for non-KCL consumers, like Lambda
functions. KCL automatically applies deaggreation for you.

This change is backwards compatible - the previously exposed function
SplitMessageIfNecessary still does the same things.
This commit is contained in:
Taylor Sutton 2020-11-16 10:28:43 -05:00
parent f8e9c34641
commit 6e8a99c50d
4 changed files with 419 additions and 13 deletions

101
Gopkg.lock generated
View file

@ -12,6 +12,47 @@
pruneopts = "" pruneopts = ""
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8" revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
[[projects]]
digest = "1:ebf4bf0cc2e8705b66f67fecee8837e26c8b67a11ede9dd1f18a2841b6de5fac"
name = "github.com/a8m/kinesis-producer"
packages = ["."]
pruneopts = ""
revision = "2b76ac68f594232c724b44643c6994768d5a5f35"
version = "v0.2.0"
[[projects]]
digest = "1:b597f05b10e1c62829891fc8110d4f8df9caaba70928eaaffeba182664ac190a"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
"aws/awserr",
"aws/awsutil",
"aws/client",
"aws/client/metadata",
"aws/credentials",
"aws/endpoints",
"aws/request",
"aws/signer/v4",
"internal/context",
"internal/ini",
"internal/sdkio",
"internal/sdkmath",
"internal/sdkrand",
"internal/shareddefaults",
"internal/strings",
"internal/sync/singleflight",
"private/protocol",
"private/protocol/eventstream",
"private/protocol/eventstream/eventstreamapi",
"private/protocol/json/jsonutil",
"private/protocol/jsonrpc",
"private/protocol/rest",
"service/kinesis",
]
pruneopts = ""
revision = "416d69c754abe8f0b32f6dcd8837d086812583d6"
version = "v1.35.28"
[[projects]] [[projects]]
digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77" digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77"
name = "github.com/davecgh/go-spew" name = "github.com/davecgh/go-spew"
@ -20,6 +61,14 @@
revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73"
version = "v1.1.1" version = "v1.1.1"
[[projects]]
digest = "1:6532affeeaaccdc6919d5773516176b77de02b4af8cf9a7fac16bae77aa319c5"
name = "github.com/golang/protobuf"
packages = ["proto"]
pruneopts = ""
revision = "4846b58453b3708320bdb524f25cc5a1d9cda4d4"
version = "v1.4.3"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:c7a6f4f1e7baeabf4de954d2d7f82343252f9e66b40abf694687b1df6fbccd5e" digest = "1:c7a6f4f1e7baeabf4de954d2d7f82343252f9e66b40abf694687b1df6fbccd5e"
@ -28,6 +77,21 @@
pruneopts = "" pruneopts = ""
revision = "5fbaaf06d9e72f781d19c61e130ba63f4d6dab31" revision = "5fbaaf06d9e72f781d19c61e130ba63f4d6dab31"
[[projects]]
digest = "1:13fe471d0ed891e8544eddfeeb0471fd3c9f2015609a1c000aefdedf52a19d40"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = ""
revision = "c2b33e84"
[[projects]]
digest = "1:ef0f9731bc6c3c59396adc50f36da36ca98c704812c449794f9326f7bc64b5f1"
name = "github.com/jpillora/backoff"
packages = ["."]
pruneopts = ""
revision = "8eab2debe79d12b7bd3d10653910df25fa9552ba"
version = "1.0.0"
[[projects]] [[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib" name = "github.com/pmezard/go-difflib"
@ -79,6 +143,41 @@
pruneopts = "" pruneopts = ""
revision = "3af7569d3a1e776fc2a3c1cec133b43105ea9c2e" revision = "3af7569d3a1e776fc2a3c1cec133b43105ea9c2e"
[[projects]]
digest = "1:0d049ff01749ce1d6092c85cb90a02dfdb3b401939b13415b7e608f36bc8ecee"
name = "google.golang.org/protobuf"
packages = [
"encoding/prototext",
"encoding/protowire",
"internal/descfmt",
"internal/descopts",
"internal/detrand",
"internal/encoding/defval",
"internal/encoding/messageset",
"internal/encoding/tag",
"internal/encoding/text",
"internal/errors",
"internal/fieldsort",
"internal/filedesc",
"internal/filetype",
"internal/flags",
"internal/genid",
"internal/impl",
"internal/mapsort",
"internal/pragma",
"internal/set",
"internal/strs",
"internal/version",
"proto",
"reflect/protoreflect",
"reflect/protoregistry",
"runtime/protoiface",
"runtime/protoimpl",
]
pruneopts = ""
revision = "3f7a61f89bb6813f89d981d1870ed68da0b3c3f1"
version = "v1.25.0"
[[projects]] [[projects]]
digest = "1:0923ed679d96b9e53b8c43892e2094ebb5dccd0a66709b20c90f43c37ff71ff1" digest = "1:0923ed679d96b9e53b8c43892e2094ebb5dccd0a66709b20c90f43c37ff71ff1"
name = "gopkg.in/Clever/kayvee-go.v6" name = "gopkg.in/Clever/kayvee-go.v6"
@ -113,6 +212,8 @@
input-imports = [ input-imports = [
"github.com/Clever/syslogparser", "github.com/Clever/syslogparser",
"github.com/Clever/syslogparser/rfc3164", "github.com/Clever/syslogparser/rfc3164",
"github.com/a8m/kinesis-producer",
"github.com/golang/protobuf/proto",
"github.com/stretchr/testify/assert", "github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require", "github.com/stretchr/testify/require",
"golang.org/x/time/rate", "golang.org/x/time/rate",

View file

@ -12,3 +12,11 @@
[[constraint]] [[constraint]]
name = "gopkg.in/Clever/kayvee-go.v6" name = "gopkg.in/Clever/kayvee-go.v6"
version = "6.24.0" version = "6.24.0"
[[constraint]]
name = "github.com/a8m/kinesis-producer"
version = "v0.2.0"
[[constraint]]
name = "github.com/golang/protobuf"
version = "v1.4.3"

View file

@ -1,39 +1,137 @@
// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream.
// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles,
// as well as apply appropriate decompression.
// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as
// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer.
// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use
// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves.
// - DeaggregateAndSplitIfNecessary if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary
// in addition to the KPL splitting.
package splitter package splitter
import ( import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"compress/zlib" "compress/zlib"
"crypto/md5"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"regexp" "regexp"
"strconv" "strconv"
"time" "time"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
) )
// SplitMessageIfNecessary handles three types of records: // The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single
// Amazon Kinesis record for efficient puts.
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
// IsKPLAggregate checks a record for a KPL aggregate prefix.
// It is not necessary to call this before calling KPLDeaggregate.
func IsKPLAggregate(data []byte) bool {
return bytes.HasPrefix(data, kplMagicNumber)
}
// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation.
// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records.
// Otherwise, the return value is a singleton containing the original record.
func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return [][]byte{kinesisRecord}, nil
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// false alarm - the header matched but the checksum doesn't, so it's not KPL
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
records = append(records, userRecord.Data)
}
return records, nil
}
// DeaggregateAndSplitIfNecessary is a combination of KPLDeaggregate and SplitMessageIfNecessary
// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record.
// If successful, it iterates over the individual user records and attempts to unzlib them.
// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version.
// If it is not zlibbed, the output will contain the record verbatim
// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary.
// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header.
// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib.
func DeaggregateAndSplitIfNecessary(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return SplitMessageIfNecessary(kinesisRecord)
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// false alarm - the header matched but the checksum doesn't, so it's not KPL
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
record, err := unzlib(userRecord.Data)
if err != nil {
return nil, fmt.Errorf("unzlibbing record: %w", err)
}
records = append(records, record)
}
return records, nil
}
// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records.
// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this.
// This function handles three types of records:
// - records emitted from CWLogs Subscription (which are gzip compressed) // - records emitted from CWLogs Subscription (which are gzip compressed)
// - uncompressed records emitted from KPL // - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bi // - any other record (left unchanged)
func SplitMessageIfNecessary(record []byte) ([][]byte, error) { func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) {
if IsGzipped(record) { // First try the record as a CWLogs record
// Process a batch of messages from a CWLogs Subscription if IsGzipped(userRecord) {
return GetMessagesFromGzippedInput(record) return GetMessagesFromGzippedInput(userRecord)
} }
// Try to read it as a zlib-compressed record unzlibRecord, err := unzlib(userRecord)
// zlib.NewReader checks for a zlib header and returns an error if not found if err != nil {
zlibReader, err := zlib.NewReader(bytes.NewReader(record)) return nil, err
}
// Process a single message, from KPL
return [][]byte{unzlibRecord}, nil
}
func unzlib(input []byte) ([]byte, error) {
zlibReader, err := zlib.NewReader(bytes.NewReader(input))
if err == nil { if err == nil {
unzlibRecord, err := ioutil.ReadAll(zlibReader) unzlibRecord, err := ioutil.ReadAll(zlibReader)
if err != nil { if err != nil {
return nil, fmt.Errorf("reading zlib-compressed record: %v", err) return nil, fmt.Errorf("reading zlib-compressed record: %v", err)
} }
return [][]byte{unzlibRecord}, nil return unzlibRecord, nil
} }
// Process a single message, from KPL return input, nil
return [][]byte{record}, nil
} }

View file

@ -4,12 +4,15 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"compress/zlib" "compress/zlib"
"crypto/md5"
b64 "encoding/base64" b64 "encoding/base64"
"encoding/json" "encoding/json"
"testing" "testing"
"time" "time"
"github.com/Clever/amazon-kinesis-client-go/decode" "github.com/Clever/amazon-kinesis-client-go/decode"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -286,6 +289,7 @@ func TestSplitGlue(t *testing.T) {
} }
} }
// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC
func TestSplitIfNecesary(t *testing.T) { func TestSplitIfNecesary(t *testing.T) {
// We provide three different inputs to batchedWriter.splitMessageIfNecessary // We provide three different inputs to batchedWriter.splitMessageIfNecessary
@ -349,3 +353,198 @@ func TestSplitIfNecesary(t *testing.T) {
[][]byte{expectedRecord}, [][]byte{expectedRecord},
) )
} }
func createKPLAggregate(input [][]byte, compress bool) []byte {
var partitionKeyIndex uint64 = 0
records := []*kpl.Record{}
for _, log := range input {
if compress {
var z bytes.Buffer
zbuf := zlib.NewWriter(&z)
zbuf.Write(log)
zbuf.Close()
log = z.Bytes()
}
records = append(records, &kpl.Record{
PartitionKeyIndex: &partitionKeyIndex,
Data: log,
})
}
logProto, err := proto.Marshal(&kpl.AggregatedRecord{
PartitionKeyTable: []string{"ecs_task_arn"},
Records: records,
})
if err != nil {
panic(err)
}
log := append(kplMagicNumber, logProto...)
logHash := md5.Sum(logProto)
return append(log, logHash[0:16]...)
}
// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC
func TestKPLDeaggregate(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := KPLDeaggregate(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}
// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC
func TestDeaggregateAndSplit(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
{
description: "one kpl-aggregated zlib record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
true,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated zlib record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
true,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
var g bytes.Buffer
gbuf := gzip.NewWriter(&g)
cwLogBatch := LogEventBatch{
MessageType: "test",
Owner: "test",
LogGroup: "test",
LogStream: "test",
SubscriptionFilters: []string{""},
LogEvents: []LogEvent{{
ID: "test",
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
Message: "test",
}},
}
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
gbuf.Write(cwLogBatchJSON)
gbuf.Close()
gzipBatchInput := g.Bytes()
tests = append(tests, test{
description: "cloudwatch log batch",
input: gzipBatchInput,
output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")},
shouldError: false,
})
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := DeaggregateAndSplitIfNecessary(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}