diff --git a/Gopkg.lock b/Gopkg.lock index 5fd7dae..192d72c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -12,6 +12,47 @@ pruneopts = "" revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8" +[[projects]] + digest = "1:ebf4bf0cc2e8705b66f67fecee8837e26c8b67a11ede9dd1f18a2841b6de5fac" + name = "github.com/a8m/kinesis-producer" + packages = ["."] + pruneopts = "" + revision = "2b76ac68f594232c724b44643c6994768d5a5f35" + version = "v0.2.0" + +[[projects]] + digest = "1:b597f05b10e1c62829891fc8110d4f8df9caaba70928eaaffeba182664ac190a" + name = "github.com/aws/aws-sdk-go" + packages = [ + "aws", + "aws/awserr", + "aws/awsutil", + "aws/client", + "aws/client/metadata", + "aws/credentials", + "aws/endpoints", + "aws/request", + "aws/signer/v4", + "internal/context", + "internal/ini", + "internal/sdkio", + "internal/sdkmath", + "internal/sdkrand", + "internal/shareddefaults", + "internal/strings", + "internal/sync/singleflight", + "private/protocol", + "private/protocol/eventstream", + "private/protocol/eventstream/eventstreamapi", + "private/protocol/json/jsonutil", + "private/protocol/jsonrpc", + "private/protocol/rest", + "service/kinesis", + ] + pruneopts = "" + revision = "416d69c754abe8f0b32f6dcd8837d086812583d6" + version = "v1.35.28" + [[projects]] digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77" name = "github.com/davecgh/go-spew" @@ -20,6 +61,14 @@ revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" version = "v1.1.1" +[[projects]] + digest = "1:6532affeeaaccdc6919d5773516176b77de02b4af8cf9a7fac16bae77aa319c5" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "" + revision = "4846b58453b3708320bdb524f25cc5a1d9cda4d4" + version = "v1.4.3" + [[projects]] branch = "master" digest = "1:c7a6f4f1e7baeabf4de954d2d7f82343252f9e66b40abf694687b1df6fbccd5e" @@ -28,6 +77,21 @@ pruneopts = "" revision = "5fbaaf06d9e72f781d19c61e130ba63f4d6dab31" +[[projects]] + digest = "1:13fe471d0ed891e8544eddfeeb0471fd3c9f2015609a1c000aefdedf52a19d40" + name = "github.com/jmespath/go-jmespath" + packages = ["."] + pruneopts = "" + revision = "c2b33e84" + +[[projects]] + digest = "1:ef0f9731bc6c3c59396adc50f36da36ca98c704812c449794f9326f7bc64b5f1" + name = "github.com/jpillora/backoff" + packages = ["."] + pruneopts = "" + revision = "8eab2debe79d12b7bd3d10653910df25fa9552ba" + version = "1.0.0" + [[projects]] digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" @@ -79,6 +143,41 @@ pruneopts = "" revision = "3af7569d3a1e776fc2a3c1cec133b43105ea9c2e" +[[projects]] + digest = "1:0d049ff01749ce1d6092c85cb90a02dfdb3b401939b13415b7e608f36bc8ecee" + name = "google.golang.org/protobuf" + packages = [ + "encoding/prototext", + "encoding/protowire", + "internal/descfmt", + "internal/descopts", + "internal/detrand", + "internal/encoding/defval", + "internal/encoding/messageset", + "internal/encoding/tag", + "internal/encoding/text", + "internal/errors", + "internal/fieldsort", + "internal/filedesc", + "internal/filetype", + "internal/flags", + "internal/genid", + "internal/impl", + "internal/mapsort", + "internal/pragma", + "internal/set", + "internal/strs", + "internal/version", + "proto", + "reflect/protoreflect", + "reflect/protoregistry", + "runtime/protoiface", + "runtime/protoimpl", + ] + pruneopts = "" + revision = "3f7a61f89bb6813f89d981d1870ed68da0b3c3f1" + version = "v1.25.0" + [[projects]] digest = "1:0923ed679d96b9e53b8c43892e2094ebb5dccd0a66709b20c90f43c37ff71ff1" name = "gopkg.in/Clever/kayvee-go.v6" @@ -113,6 +212,8 @@ input-imports = [ "github.com/Clever/syslogparser", "github.com/Clever/syslogparser/rfc3164", + "github.com/a8m/kinesis-producer", + "github.com/golang/protobuf/proto", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "golang.org/x/time/rate", diff --git a/Gopkg.toml b/Gopkg.toml index 52d457f..d4739d5 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -12,3 +12,11 @@ [[constraint]] name = "gopkg.in/Clever/kayvee-go.v6" version = "6.24.0" + +[[constraint]] + name = "github.com/a8m/kinesis-producer" + version = "v0.2.0" + +[[constraint]] + name = "github.com/golang/protobuf" + version = "v1.4.3" diff --git a/splitter/splitter.go b/splitter/splitter.go index 7027a18..abcf9ee 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -1,39 +1,147 @@ +// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream. +// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles, +// as well as apply appropriate decompression. +// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as +// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer. +// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use +// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves. +// - Deaggregate if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary +// in addition to the KPL splitting. package splitter import ( "bytes" "compress/gzip" "compress/zlib" + "crypto/md5" "encoding/json" "fmt" "io/ioutil" "regexp" "strconv" "time" + + kpl "github.com/a8m/kinesis-producer" + "github.com/golang/protobuf/proto" ) -// SplitMessageIfNecessary handles three types of records: +// The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single +// Amazon Kinesis record for efficient puts. +// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md +var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2} + +// IsKPLAggregate checks a record for a KPL aggregate prefix. +// It is not necessary to call this before calling KPLDeaggregate. +func IsKPLAggregate(data []byte) bool { + return bytes.HasPrefix(data, kplMagicNumber) +} + +// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation. +// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records. +// Otherwise, the return value is a singleton containing the original record. +func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) { + if !IsKPLAggregate(kinesisRecord) { + return [][]byte{kinesisRecord}, nil + } + src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size] + checksum := kinesisRecord[len(kinesisRecord)-md5.Size:] + recordSum := md5.Sum(src) + for i, b := range checksum { + if b != recordSum[i] { + // either the data is corrupted or this is not a KPL aggregate + // either way, return the data as-is + return [][]byte{kinesisRecord}, nil + } + } + dest := new(kpl.AggregatedRecord) + err := proto.Unmarshal(src, dest) + if err != nil { + return nil, fmt.Errorf("unmarshalling proto: %v", err) + } + var records [][]byte + for _, userRecord := range dest.GetRecords() { + records = append(records, userRecord.Data) + } + return records, nil +} + +// Deaggregate is a combination of KPLDeaggregate and SplitMessageIfNecessary +// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record. +// If successful, it iterates over the individual user records and attempts to unzlib them. +// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version. +// If it is not zlibbed, the output will contain the record verbatim +// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary. +// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header. +// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib. +// +// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles. +func Deaggregate(kinesisRecord []byte) ([][]byte, error) { + if !IsKPLAggregate(kinesisRecord) { + return SplitMessageIfNecessary(kinesisRecord) + } + src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size] + checksum := kinesisRecord[len(kinesisRecord)-md5.Size:] + recordSum := md5.Sum(src) + for i, b := range checksum { + if b != recordSum[i] { + // either the data is corrupted or this is not a KPL aggregate + // either way, return the data as-is + return [][]byte{kinesisRecord}, nil + } + } + dest := new(kpl.AggregatedRecord) + err := proto.Unmarshal(src, dest) + if err != nil { + return nil, fmt.Errorf("unmarshalling proto: %v", err) + } + var records [][]byte + for _, userRecord := range dest.GetRecords() { + record, err := unzlib(userRecord.Data) + if err != nil { + return nil, fmt.Errorf("unzlibbing record: %w", err) + } + records = append(records, record) + } + return records, nil +} + +// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records. +// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this. +// This function handles three types of records: // - records emitted from CWLogs Subscription (which are gzip compressed) -// - uncompressed records emitted from KPL -// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bi -func SplitMessageIfNecessary(record []byte) ([][]byte, error) { - if IsGzipped(record) { - // Process a batch of messages from a CWLogs Subscription - return GetMessagesFromGzippedInput(record) +// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit +// - any other record (left unchanged) +// +// CloudWatch logs come as structured JSON. In the process of splitting, they are converted +// into an rsyslog format that allows fairly uniform parsing of the result across the +// AWS services that might emit logs to CloudWatch. +// Note that these timezone used in these syslog records is guessed based on the local env. +// If you need consistent timezones, set TZ=UTC in your environment. +func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) { + // First try the record as a CWLogs record + if IsGzipped(userRecord) { + return GetMessagesFromGzippedInput(userRecord) } - // Try to read it as a zlib-compressed record - // zlib.NewReader checks for a zlib header and returns an error if not found - zlibReader, err := zlib.NewReader(bytes.NewReader(record)) + unzlibRecord, err := unzlib(userRecord) + if err != nil { + return nil, err + } + + // Process a single message, from KPL + return [][]byte{unzlibRecord}, nil +} + +func unzlib(input []byte) ([]byte, error) { + zlibReader, err := zlib.NewReader(bytes.NewReader(input)) if err == nil { unzlibRecord, err := ioutil.ReadAll(zlibReader) if err != nil { return nil, fmt.Errorf("reading zlib-compressed record: %v", err) } - return [][]byte{unzlibRecord}, nil + return unzlibRecord, nil } - // Process a single message, from KPL - return [][]byte{record}, nil + return input, nil } diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 2b01b4f..1762115 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -4,16 +4,28 @@ import ( "bytes" "compress/gzip" "compress/zlib" + "crypto/md5" b64 "encoding/base64" "encoding/json" + "os" "testing" "time" "github.com/Clever/amazon-kinesis-client-go/decode" + kpl "github.com/a8m/kinesis-producer" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestMain(m *testing.M) { + // In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string, + // the timezone used in the final string depends on the locally set timezone. + // in order for tests to pass, we set TZ to UTC + os.Setenv("TZ", "UTC") + os.Exit(m.Run()) +} + func TestUnpacking(t *testing.T) { input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA" @@ -349,3 +361,196 @@ func TestSplitIfNecesary(t *testing.T) { [][]byte{expectedRecord}, ) } + +func createKPLAggregate(input [][]byte, compress bool) []byte { + var partitionKeyIndex uint64 = 0 + + records := []*kpl.Record{} + for _, log := range input { + if compress { + var z bytes.Buffer + zbuf := zlib.NewWriter(&z) + zbuf.Write(log) + zbuf.Close() + log = z.Bytes() + } + records = append(records, &kpl.Record{ + PartitionKeyIndex: &partitionKeyIndex, + Data: log, + }) + } + + logProto, err := proto.Marshal(&kpl.AggregatedRecord{ + PartitionKeyTable: []string{"ecs_task_arn"}, + Records: records, + }) + if err != nil { + panic(err) + } + log := append(kplMagicNumber, logProto...) + logHash := md5.Sum(logProto) + return append(log, logHash[0:16]...) +} + +func TestKPLDeaggregate(t *testing.T) { + type test struct { + description string + input []byte + output [][]byte + shouldError bool + } + + tests := []test{ + { + description: "non-aggregated record", + input: []byte("hello"), + output: [][]byte{[]byte("hello")}, + shouldError: false, + }, + { + description: "one kpl-aggregated record", + input: createKPLAggregate( + [][]byte{[]byte("hello")}, + false, + ), + output: [][]byte{[]byte("hello")}, + shouldError: false, + }, + { + description: "three kpl-aggregated record", + input: createKPLAggregate([][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + false, + ), + output: [][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + shouldError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + out, err := KPLDeaggregate(tt.input) + if tt.shouldError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, out, tt.output) + }) + + } +} + +func TestDeaggregate(t *testing.T) { + type test struct { + description string + input []byte + output [][]byte + shouldError bool + } + + tests := []test{ + { + description: "non-aggregated record", + input: []byte("hello"), + output: [][]byte{[]byte("hello")}, + shouldError: false, + }, + { + description: "one kpl-aggregated record", + input: createKPLAggregate( + [][]byte{[]byte("hello")}, + false, + ), + output: [][]byte{[]byte("hello")}, + shouldError: false, + }, + { + description: "three kpl-aggregated record", + input: createKPLAggregate([][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + false, + ), + output: [][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + shouldError: false, + }, + { + description: "one kpl-aggregated zlib record", + input: createKPLAggregate( + [][]byte{[]byte("hello")}, + true, + ), + output: [][]byte{[]byte("hello")}, + shouldError: false, + }, + { + description: "three kpl-aggregated zlib record", + input: createKPLAggregate([][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + true, + ), + output: [][]byte{ + []byte("hello, "), + []byte("world"), + []byte("!"), + }, + shouldError: false, + }, + } + + var g bytes.Buffer + gbuf := gzip.NewWriter(&g) + cwLogBatch := LogEventBatch{ + MessageType: "test", + Owner: "test", + LogGroup: "test", + LogStream: "test", + SubscriptionFilters: []string{""}, + LogEvents: []LogEvent{{ + ID: "test", + Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), + Message: "test", + }}, + } + cwLogBatchJSON, _ := json.Marshal(cwLogBatch) + gbuf.Write(cwLogBatchJSON) + gbuf.Close() + gzipBatchInput := g.Bytes() + + tests = append(tests, test{ + description: "cloudwatch log batch", + input: gzipBatchInput, + output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")}, + shouldError: false, + }) + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + out, err := Deaggregate(tt.input) + + if tt.shouldError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, out, tt.output) + }) + + } +}