diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index d562be1..30c2369 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -1,12 +1,9 @@ package batchconsumer import ( - "bytes" - "compress/zlib" "context" "encoding/base64" "fmt" - "io/ioutil" "math/big" "golang.org/x/time/rate" @@ -60,31 +57,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer return nil } -func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { - // We handle three types of records: - // - records emitted from CWLogs Subscription (which are gzip compressed) - // - uncompressed records emitted from KPL - // - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit) - if splitter.IsGzipped(record) { - // Process a batch of messages from a CWLogs Subscription - return splitter.GetMessagesFromGzippedInput(record) - } - - // Try to read it as a zlib-compressed record - // zlib.NewReader checks for a zlib header and returns an error if not found - zlibReader, err := zlib.NewReader(bytes.NewReader(record)) - if err == nil { - unzlibRecord, err := ioutil.ReadAll(zlibReader) - if err != nil { - return nil, fmt.Errorf("reading zlib-compressed record: %v", err) - } - return [][]byte{unzlibRecord}, nil - } - // Process a single message, from KPL - return [][]byte{record}, nil - -} - func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { var pair kcl.SequencePair prevPair := b.lastProcessedSeq @@ -108,7 +80,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return err } - messages, err := b.splitMessageIfNecessary(data) + messages, err := splitter.SplitMessageIfNecessary(data) if err != nil { return err } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 7a79fac..1d7e50c 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -1,11 +1,7 @@ package batchconsumer import ( - "bytes" - "compress/gzip" - "compress/zlib" "encoding/base64" - "encoding/json" "fmt" "testing" "time" @@ -14,7 +10,6 @@ import ( "gopkg.in/Clever/kayvee-go.v6/logger" "github.com/Clever/amazon-kinesis-client-go/kcl" - "github.com/Clever/amazon-kinesis-client-go/splitter" ) type ignoringSender struct{} @@ -426,75 +421,3 @@ func TestStaggeredCheckpointing(t *testing.T) { assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) } - -func TestSplitIfNecesary(t *testing.T) { - - // We provide three different inputs to batchedWriter.splitMessageIfNecessary - // plain text - // zlib compressed text - // gzip compressed CloudWatch logs batch - // we verify that the split function matches the input against the correct splitter - // and decodes it. - - assert := assert.New(t) - - mockFailedLogsFile := logger.New("testing") - mockconfig := withDefaults(Config{ - BatchInterval: 10 * time.Millisecond, - CheckpointFreq: 20 * time.Millisecond, - }) - - wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) - - plainTextInput := []byte("hello, world!") - - records, err := wrt.splitMessageIfNecessary(plainTextInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{[]byte("hello, world!")}, - ) - - var z bytes.Buffer - zbuf := zlib.NewWriter(&z) - zbuf.Write([]byte("hello, world!")) - zbuf.Close() - zlibSingleInput := z.Bytes() - - records, err = wrt.splitMessageIfNecessary(zlibSingleInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{[]byte("hello, world!")}, - ) - - // the details of this part aren't super important since the actual functionality is - // tested in the splitter package, we just want to make sure that OUR split function - // correctly realizes it's gzip and call the splitter package's functionality - var g bytes.Buffer - gbuf := gzip.NewWriter(&g) - cwLogBatch := splitter.LogEventBatch{ - MessageType: "test", - Owner: "test", - LogGroup: "test", - LogStream: "test", - SubscriptionFilters: []string{""}, - LogEvents: []splitter.LogEvent{{ - ID: "test", - Timestamp: splitter.UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), - Message: "test", - }}, - } - cwLogBatchJSON, _ := json.Marshal(cwLogBatch) - gbuf.Write(cwLogBatchJSON) - gbuf.Close() - gzipBatchInput := g.Bytes() - - expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test") - records, err = wrt.splitMessageIfNecessary(gzipBatchInput) - assert.NoError(err) - assert.Equal( - records, - [][]byte{expectedRecord}, - ) -} diff --git a/splitter/splitter.go b/splitter/splitter.go index be32f7c..7027a18 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -3,6 +3,7 @@ package splitter import ( "bytes" "compress/gzip" + "compress/zlib" "encoding/json" "fmt" "io/ioutil" @@ -11,6 +12,31 @@ import ( "time" ) +// SplitMessageIfNecessary handles three types of records: +// - records emitted from CWLogs Subscription (which are gzip compressed) +// - uncompressed records emitted from KPL +// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bi +func SplitMessageIfNecessary(record []byte) ([][]byte, error) { + if IsGzipped(record) { + // Process a batch of messages from a CWLogs Subscription + return GetMessagesFromGzippedInput(record) + } + + // Try to read it as a zlib-compressed record + // zlib.NewReader checks for a zlib header and returns an error if not found + zlibReader, err := zlib.NewReader(bytes.NewReader(record)) + if err == nil { + unzlibRecord, err := ioutil.ReadAll(zlibReader) + if err != nil { + return nil, fmt.Errorf("reading zlib-compressed record: %v", err) + } + return [][]byte{unzlibRecord}, nil + } + // Process a single message, from KPL + return [][]byte{record}, nil + +} + // LogEvent is a single log line within a LogEventBatch type LogEvent struct { ID string `json:"id"` diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 170a1f1..2b01b4f 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -3,9 +3,11 @@ package splitter import ( "bytes" "compress/gzip" + "compress/zlib" b64 "encoding/base64" "encoding/json" "testing" + "time" "github.com/Clever/amazon-kinesis-client-go/decode" "github.com/stretchr/testify/assert" @@ -283,3 +285,67 @@ func TestSplitGlue(t *testing.T) { assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"]) } } + +func TestSplitIfNecesary(t *testing.T) { + + // We provide three different inputs to batchedWriter.splitMessageIfNecessary + // plain text + // zlib compressed text + // gzip compressed CloudWatch logs batch + // we verify that the split function matches the input against the correct splitter + // and decodes it. + + assert := assert.New(t) + + plainTextInput := []byte("hello, world!") + + records, err := SplitMessageIfNecessary(plainTextInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + var z bytes.Buffer + zbuf := zlib.NewWriter(&z) + zbuf.Write([]byte("hello, world!")) + zbuf.Close() + zlibSingleInput := z.Bytes() + + records, err = SplitMessageIfNecessary(zlibSingleInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + // the details of this part aren't super important since the actual functionality is + // tested in other tests; for this test we just want to make sure that split function + // correctly realizes it's gzip and call the appropriate CW-log-splitting logic + var g bytes.Buffer + gbuf := gzip.NewWriter(&g) + cwLogBatch := LogEventBatch{ + MessageType: "test", + Owner: "test", + LogGroup: "test", + LogStream: "test", + SubscriptionFilters: []string{""}, + LogEvents: []LogEvent{{ + ID: "test", + Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), + Message: "test", + }}, + } + cwLogBatchJSON, _ := json.Marshal(cwLogBatch) + gbuf.Write(cwLogBatchJSON) + gbuf.Close() + gzipBatchInput := g.Bytes() + + expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test") + records, err = SplitMessageIfNecessary(gzipBatchInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{expectedRecord}, + ) +}