From f8e9c34641c802fb6d202ade20bd534d6afcb69d Mon Sep 17 00:00:00 2001 From: Taylor Sutton Date: Thu, 12 Nov 2020 11:37:13 -0500 Subject: [PATCH] Refactor zlib decompression into the splitter package Being in the batchconsumer package means it will work for anything using KCL, but lambdas that subscribe to these log streams do not use batchconsumer at all; instead they invoke the splitter package directly. As such, if we want this functionality to be available to lambda log consumers, it can't be in batchconsumer. There are no functionality changes here, just moving code from an unexported method in one place to an exported function in another place. The tests also get moved along with it. --- batchconsumer/writer.go | 30 +------------- batchconsumer/writer_test.go | 77 ------------------------------------ splitter/splitter.go | 26 ++++++++++++ splitter/splitter_test.go | 66 +++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 106 deletions(-) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index d562be1..30c2369 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -1,12 +1,9 @@ package batchconsumer import ( - "bytes" - "compress/zlib" "context" "encoding/base64" "fmt" - "io/ioutil" "math/big" "golang.org/x/time/rate" @@ -60,31 +57,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer return nil } -func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { - // We handle three types of records: - // - records emitted from CWLogs Subscription (which are gzip compressed) - // - uncompressed records emitted from KPL - // - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit) - if splitter.IsGzipped(record) { - // Process a batch of messages from a CWLogs Subscription - return splitter.GetMessagesFromGzippedInput(record) - } - - // Try to read it as a zlib-compressed record - // zlib.NewReader checks for a zlib header and returns an error if not found - zlibReader, err := zlib.NewReader(bytes.NewReader(record)) - if err == nil { - unzlibRecord, err := ioutil.ReadAll(zlibReader) - if err != nil { - return nil, fmt.Errorf("reading zlib-compressed record: %v", err) - } - return [][]byte{unzlibRecord}, nil - } - // Process a single message, from KPL - return [][]byte{record}, nil - -} - func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { var pair kcl.SequencePair prevPair := b.lastProcessedSeq @@ -108,7 +80,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return err } - messages, err := b.splitMessageIfNecessary(data) + messages, err := splitter.SplitMessageIfNecessary(data) if err != nil { return err } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 7a79fac..1d7e50c 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -1,11 +1,7 @@ package batchconsumer import ( - "bytes" - "compress/gzip" - "compress/zlib" "encoding/base64" - "encoding/json" "fmt" "testing" "time" @@ -14,7 +10,6 @@ import ( "gopkg.in/Clever/kayvee-go.v6/logger" "github.com/Clever/amazon-kinesis-client-go/kcl" - "github.com/Clever/amazon-kinesis-client-go/splitter" ) type ignoringSender struct{} @@ -426,75 +421,3 @@ func TestStaggeredCheckpointing(t *testing.T) { assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) } - -func TestSplitIfNecesary(t *testing.T) { - - // We provide three different inputs to batchedWriter.splitMessageIfNecessary - // plain text - // zlib compressed text - // gzip compressed CloudWatch logs batch - // we verify that the split function matches the input against the correct splitter - // and decodes it. - - assert := assert.New(t) - - mockFailedLogsFile := logger.New("testing") - mockconfig := withDefaults(Config{ - BatchInterval: 10 * time.Millisecond, - CheckpointFreq: 20 * time.Millisecond, - }) - - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) - - plainTextInput := []byte("hello, world!") - - records, err := wrt.splitMessageIfNecessary(plainTextInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{[]byte("hello, world!")}, - ) - - var z bytes.Buffer - zbuf := zlib.NewWriter(&z) - zbuf.Write([]byte("hello, world!")) - zbuf.Close() - zlibSingleInput := z.Bytes() - - records, err = wrt.splitMessageIfNecessary(zlibSingleInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{[]byte("hello, world!")}, - ) - - // the details of this part aren't super important since the actual functionality is - // tested in the splitter package, we just want to make sure that OUR split function - // correctly realizes it's gzip and call the splitter package's functionality - var g bytes.Buffer - gbuf := gzip.NewWriter(&g) - cwLogBatch := splitter.LogEventBatch{ - MessageType: "test", - Owner: "test", - LogGroup: "test", - LogStream: "test", - SubscriptionFilters: []string{""}, - LogEvents: []splitter.LogEvent{{ - ID: "test", - Timestamp: splitter.UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), - Message: "test", - }}, - } - cwLogBatchJSON, _ := json.Marshal(cwLogBatch) - gbuf.Write(cwLogBatchJSON) - gbuf.Close() - gzipBatchInput := g.Bytes() - - expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test") - records, err = wrt.splitMessageIfNecessary(gzipBatchInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{expectedRecord}, - ) -} diff --git a/splitter/splitter.go b/splitter/splitter.go index be32f7c..7027a18 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -3,6 +3,7 @@ package splitter import ( "bytes" "compress/gzip" + "compress/zlib" "encoding/json" "fmt" "io/ioutil" @@ -11,6 +12,31 @@ import ( "time" ) +// SplitMessageIfNecessary handles three types of records: +// - records emitted from CWLogs Subscription (which are gzip compressed) +// - uncompressed records emitted from KPL +// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bi +func SplitMessageIfNecessary(record []byte) ([][]byte, error) { + if IsGzipped(record) { + // Process a batch of messages from a CWLogs Subscription + return GetMessagesFromGzippedInput(record) + } + + // Try to read it as a zlib-compressed record + // zlib.NewReader checks for a zlib header and returns an error if not found + zlibReader, err := zlib.NewReader(bytes.NewReader(record)) + if err == nil { + unzlibRecord, err := ioutil.ReadAll(zlibReader) + if err != nil { + return nil, fmt.Errorf("reading zlib-compressed record: %v", err) + } + return [][]byte{unzlibRecord}, nil + } + // Process a single message, from KPL + return [][]byte{record}, nil + +} + // LogEvent is a single log line within a LogEventBatch type LogEvent struct { ID string `json:"id"` diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 170a1f1..2b01b4f 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -3,9 +3,11 @@ package splitter import ( "bytes" "compress/gzip" + "compress/zlib" b64 "encoding/base64" "encoding/json" "testing" + "time" "github.com/Clever/amazon-kinesis-client-go/decode" "github.com/stretchr/testify/assert" @@ -283,3 +285,67 @@ func TestSplitGlue(t *testing.T) { assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"]) } } + +func TestSplitIfNecesary(t *testing.T) { + + // We provide three different inputs to batchedWriter.splitMessageIfNecessary + // plain text + // zlib compressed text + // gzip compressed CloudWatch logs batch + // we verify that the split function matches the input against the correct splitter + // and decodes it. + + assert := assert.New(t) + + plainTextInput := []byte("hello, world!") + + records, err := SplitMessageIfNecessary(plainTextInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + var z bytes.Buffer + zbuf := zlib.NewWriter(&z) + zbuf.Write([]byte("hello, world!")) + zbuf.Close() + zlibSingleInput := z.Bytes() + + records, err = SplitMessageIfNecessary(zlibSingleInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + // the details of this part aren't super important since the actual functionality is + // tested in other tests; for this test we just want to make sure that split function + // correctly realizes it's gzip and call the appropriate CW-log-splitting logic + var g bytes.Buffer + gbuf := gzip.NewWriter(&g) + cwLogBatch := LogEventBatch{ + MessageType: "test", + Owner: "test", + LogGroup: "test", + LogStream: "test", + SubscriptionFilters: []string{""}, + LogEvents: []LogEvent{{ + ID: "test", + Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), + Message: "test", + }}, + } + cwLogBatchJSON, _ := json.Marshal(cwLogBatch) + gbuf.Write(cwLogBatchJSON) + gbuf.Close() + gzipBatchInput := g.Bytes() + + expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test") + records, err = SplitMessageIfNecessary(gzipBatchInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{expectedRecord}, + ) +}