From cc4f7716b3fa6ef37c19b82fbf703287cdcf7ab7 Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Mon, 16 Nov 2020 12:44:50 -0500 Subject: [PATCH] More documentation around handling of CW log bundles; deflake test --- splitter/splitter.go | 14 ++++++++++++-- splitter/splitter_test.go | 12 +++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index 1a3aae7..c8f3fde 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -48,7 +48,8 @@ func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) { recordSum := md5.Sum(src) for i, b := range checksum { if b != recordSum[i] { - // false alarm - the header matched but the checksum doesn't, so it's not KPL + // either the data is corrupted or this is not a KPL aggregate + // either way, return the data as-is return [][]byte{kinesisRecord}, nil } } @@ -72,6 +73,8 @@ func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) { // A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary. // This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header. // Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib. +// +// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles. func DeaggregateAndSplitIfNecessary(kinesisRecord []byte) ([][]byte, error) { if !IsKPLAggregate(kinesisRecord) { return SplitMessageIfNecessary(kinesisRecord) @@ -81,7 +84,8 @@ func DeaggregateAndSplitIfNecessary(kinesisRecord []byte) ([][]byte, error) { recordSum := md5.Sum(src) for i, b := range checksum { if b != recordSum[i] { - // false alarm - the header matched but the checksum doesn't, so it's not KPL + // either the data is corrupted or this is not a KPL aggregate + // either way, return the data as-is return [][]byte{kinesisRecord}, nil } } @@ -107,6 +111,12 @@ func DeaggregateAndSplitIfNecessary(kinesisRecord []byte) ([][]byte, error) { // - records emitted from CWLogs Subscription (which are gzip compressed) // - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit // - any other record (left unchanged) +// +// CloudWatch logs come as structured JSON. In the process of splitting, they are converted +// into an rsyslog format that allows fairly uniform parsing of the result across the +// AWS services that might emit logs to CloudWatch. +// Note that these timezone used in these syslog records is guessed based on the local env. +// If you need consistent timezones, set TZ=UTC in your environment. func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) { // First try the record as a CWLogs record if IsGzipped(userRecord) { diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 8ebbae7..1ea0d39 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -7,6 +7,7 @@ import ( "crypto/md5" b64 "encoding/base64" "encoding/json" + "os" "testing" "time" @@ -17,6 +18,14 @@ import ( "github.com/stretchr/testify/require" ) +func TestMain(m *testing.M) { + // In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string, + // the timezone used in the final string depends on the locally set timezone. + // in order for tests to pass, we set TZ to UTC + os.Setenv("TZ", "UTC") + os.Exit(m.Run()) +} + func TestUnpacking(t *testing.T) { input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA" @@ -289,7 +298,6 @@ func TestSplitGlue(t *testing.T) { } } -// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC func TestSplitIfNecesary(t *testing.T) { // We provide three different inputs to batchedWriter.splitMessageIfNecessary @@ -384,7 +392,6 @@ func createKPLAggregate(input [][]byte, compress bool) []byte { return append(log, logHash[0:16]...) } -// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC func TestKPLDeaggregate(t *testing.T) { type test struct { description string @@ -441,7 +448,6 @@ func TestKPLDeaggregate(t *testing.T) { } } -// If running this test directly with `go test`, it may fail unless you set the env var TZ=UTC func TestDeaggregateAndSplit(t *testing.T) { type test struct { description string