Merge pull request #59 from Clever/refactor-decompression

Refactor decompression
This commit is contained in:
Taylor Sutton 2020-11-16 20:18:07 -05:00 committed by GitHub
commit 29f86e9986
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 435 additions and 13 deletions

101
Gopkg.lock generated
View file

@ -12,6 +12,47 @@
pruneopts = ""
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
[[projects]]
digest = "1:ebf4bf0cc2e8705b66f67fecee8837e26c8b67a11ede9dd1f18a2841b6de5fac"
name = "github.com/a8m/kinesis-producer"
packages = ["."]
pruneopts = ""
revision = "2b76ac68f594232c724b44643c6994768d5a5f35"
version = "v0.2.0"
[[projects]]
digest = "1:b597f05b10e1c62829891fc8110d4f8df9caaba70928eaaffeba182664ac190a"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
"aws/awserr",
"aws/awsutil",
"aws/client",
"aws/client/metadata",
"aws/credentials",
"aws/endpoints",
"aws/request",
"aws/signer/v4",
"internal/context",
"internal/ini",
"internal/sdkio",
"internal/sdkmath",
"internal/sdkrand",
"internal/shareddefaults",
"internal/strings",
"internal/sync/singleflight",
"private/protocol",
"private/protocol/eventstream",
"private/protocol/eventstream/eventstreamapi",
"private/protocol/json/jsonutil",
"private/protocol/jsonrpc",
"private/protocol/rest",
"service/kinesis",
]
pruneopts = ""
revision = "416d69c754abe8f0b32f6dcd8837d086812583d6"
version = "v1.35.28"
[[projects]]
digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77"
name = "github.com/davecgh/go-spew"
@ -20,6 +61,14 @@
revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73"
version = "v1.1.1"
[[projects]]
digest = "1:6532affeeaaccdc6919d5773516176b77de02b4af8cf9a7fac16bae77aa319c5"
name = "github.com/golang/protobuf"
packages = ["proto"]
pruneopts = ""
revision = "4846b58453b3708320bdb524f25cc5a1d9cda4d4"
version = "v1.4.3"
[[projects]]
branch = "master"
digest = "1:c7a6f4f1e7baeabf4de954d2d7f82343252f9e66b40abf694687b1df6fbccd5e"
@ -28,6 +77,21 @@
pruneopts = ""
revision = "5fbaaf06d9e72f781d19c61e130ba63f4d6dab31"
[[projects]]
digest = "1:13fe471d0ed891e8544eddfeeb0471fd3c9f2015609a1c000aefdedf52a19d40"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = ""
revision = "c2b33e84"
[[projects]]
digest = "1:ef0f9731bc6c3c59396adc50f36da36ca98c704812c449794f9326f7bc64b5f1"
name = "github.com/jpillora/backoff"
packages = ["."]
pruneopts = ""
revision = "8eab2debe79d12b7bd3d10653910df25fa9552ba"
version = "1.0.0"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
@ -79,6 +143,41 @@
pruneopts = ""
revision = "3af7569d3a1e776fc2a3c1cec133b43105ea9c2e"
[[projects]]
digest = "1:0d049ff01749ce1d6092c85cb90a02dfdb3b401939b13415b7e608f36bc8ecee"
name = "google.golang.org/protobuf"
packages = [
"encoding/prototext",
"encoding/protowire",
"internal/descfmt",
"internal/descopts",
"internal/detrand",
"internal/encoding/defval",
"internal/encoding/messageset",
"internal/encoding/tag",
"internal/encoding/text",
"internal/errors",
"internal/fieldsort",
"internal/filedesc",
"internal/filetype",
"internal/flags",
"internal/genid",
"internal/impl",
"internal/mapsort",
"internal/pragma",
"internal/set",
"internal/strs",
"internal/version",
"proto",
"reflect/protoreflect",
"reflect/protoregistry",
"runtime/protoiface",
"runtime/protoimpl",
]
pruneopts = ""
revision = "3f7a61f89bb6813f89d981d1870ed68da0b3c3f1"
version = "v1.25.0"
[[projects]]
digest = "1:0923ed679d96b9e53b8c43892e2094ebb5dccd0a66709b20c90f43c37ff71ff1"
name = "gopkg.in/Clever/kayvee-go.v6"
@ -113,6 +212,8 @@
input-imports = [
"github.com/Clever/syslogparser",
"github.com/Clever/syslogparser/rfc3164",
"github.com/a8m/kinesis-producer",
"github.com/golang/protobuf/proto",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
"golang.org/x/time/rate",

View file

@ -12,3 +12,11 @@
[[constraint]]
name = "gopkg.in/Clever/kayvee-go.v6"
version = "6.24.0"
[[constraint]]
name = "github.com/a8m/kinesis-producer"
version = "v0.2.0"
[[constraint]]
name = "github.com/golang/protobuf"
version = "v1.4.3"

View file

@ -1,39 +1,147 @@
// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream.
// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles,
// as well as apply appropriate decompression.
// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as
// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer.
// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use
// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves.
// - Deaggregate if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary
// in addition to the KPL splitting.
package splitter
import (
"bytes"
"compress/gzip"
"compress/zlib"
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
"strconv"
"time"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
)
// SplitMessageIfNecessary handles three types of records:
// - records emitted from CWLogs Subscription (which are gzip compressed)
// - uncompressed records emitted from KPL
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bi
func SplitMessageIfNecessary(record []byte) ([][]byte, error) {
if IsGzipped(record) {
// Process a batch of messages from a CWLogs Subscription
return GetMessagesFromGzippedInput(record)
// The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single
// Amazon Kinesis record for efficient puts.
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
// IsKPLAggregate checks a record for a KPL aggregate prefix.
// It is not necessary to call this before calling KPLDeaggregate.
func IsKPLAggregate(data []byte) bool {
return bytes.HasPrefix(data, kplMagicNumber)
}
// Try to read it as a zlib-compressed record
// zlib.NewReader checks for a zlib header and returns an error if not found
zlibReader, err := zlib.NewReader(bytes.NewReader(record))
// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation.
// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records.
// Otherwise, the return value is a singleton containing the original record.
func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return [][]byte{kinesisRecord}, nil
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// either the data is corrupted or this is not a KPL aggregate
// either way, return the data as-is
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
records = append(records, userRecord.Data)
}
return records, nil
}
// Deaggregate is a combination of KPLDeaggregate and SplitMessageIfNecessary
// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record.
// If successful, it iterates over the individual user records and attempts to unzlib them.
// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version.
// If it is not zlibbed, the output will contain the record verbatim
// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary.
// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header.
// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib.
//
// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles.
func Deaggregate(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return SplitMessageIfNecessary(kinesisRecord)
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// either the data is corrupted or this is not a KPL aggregate
// either way, return the data as-is
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
record, err := unzlib(userRecord.Data)
if err != nil {
return nil, fmt.Errorf("unzlibbing record: %w", err)
}
records = append(records, record)
}
return records, nil
}
// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records.
// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this.
// This function handles three types of records:
// - records emitted from CWLogs Subscription (which are gzip compressed)
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit
// - any other record (left unchanged)
//
// CloudWatch logs come as structured JSON. In the process of splitting, they are converted
// into an rsyslog format that allows fairly uniform parsing of the result across the
// AWS services that might emit logs to CloudWatch.
// Note that these timezone used in these syslog records is guessed based on the local env.
// If you need consistent timezones, set TZ=UTC in your environment.
func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) {
// First try the record as a CWLogs record
if IsGzipped(userRecord) {
return GetMessagesFromGzippedInput(userRecord)
}
unzlibRecord, err := unzlib(userRecord)
if err != nil {
return nil, err
}
// Process a single message, from KPL
return [][]byte{unzlibRecord}, nil
}
func unzlib(input []byte) ([]byte, error) {
zlibReader, err := zlib.NewReader(bytes.NewReader(input))
if err == nil {
unzlibRecord, err := ioutil.ReadAll(zlibReader)
if err != nil {
return nil, fmt.Errorf("reading zlib-compressed record: %v", err)
}
return [][]byte{unzlibRecord}, nil
return unzlibRecord, nil
}
// Process a single message, from KPL
return [][]byte{record}, nil
return input, nil
}

View file

@ -4,16 +4,28 @@ import (
"bytes"
"compress/gzip"
"compress/zlib"
"crypto/md5"
b64 "encoding/base64"
"encoding/json"
"os"
"testing"
"time"
"github.com/Clever/amazon-kinesis-client-go/decode"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
// In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string,
// the timezone used in the final string depends on the locally set timezone.
// in order for tests to pass, we set TZ to UTC
os.Setenv("TZ", "UTC")
os.Exit(m.Run())
}
func TestUnpacking(t *testing.T) {
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
@ -349,3 +361,196 @@ func TestSplitIfNecesary(t *testing.T) {
[][]byte{expectedRecord},
)
}
func createKPLAggregate(input [][]byte, compress bool) []byte {
var partitionKeyIndex uint64 = 0
records := []*kpl.Record{}
for _, log := range input {
if compress {
var z bytes.Buffer
zbuf := zlib.NewWriter(&z)
zbuf.Write(log)
zbuf.Close()
log = z.Bytes()
}
records = append(records, &kpl.Record{
PartitionKeyIndex: &partitionKeyIndex,
Data: log,
})
}
logProto, err := proto.Marshal(&kpl.AggregatedRecord{
PartitionKeyTable: []string{"ecs_task_arn"},
Records: records,
})
if err != nil {
panic(err)
}
log := append(kplMagicNumber, logProto...)
logHash := md5.Sum(logProto)
return append(log, logHash[0:16]...)
}
func TestKPLDeaggregate(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := KPLDeaggregate(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}
func TestDeaggregate(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
{
description: "one kpl-aggregated zlib record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
true,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated zlib record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
true,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
var g bytes.Buffer
gbuf := gzip.NewWriter(&g)
cwLogBatch := LogEventBatch{
MessageType: "test",
Owner: "test",
LogGroup: "test",
LogStream: "test",
SubscriptionFilters: []string{""},
LogEvents: []LogEvent{{
ID: "test",
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
Message: "test",
}},
}
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
gbuf.Write(cwLogBatchJSON)
gbuf.Close()
gzipBatchInput := g.Bytes()
tests = append(tests, test{
description: "cloudwatch log batch",
input: gzipBatchInput,
output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")},
shouldError: false,
})
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := Deaggregate(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}