diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 4fe8c9b..d562be1 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -1,9 +1,12 @@ package batchconsumer import ( + "bytes" + "compress/zlib" "context" "encoding/base64" "fmt" + "io/ioutil" "math/big" "golang.org/x/time/rate" @@ -58,16 +61,28 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer } func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { - // We handle two types of records: - // - records emitted from CWLogs Subscription - // - records emitted from KPL - if !splitter.IsGzipped(record) { - // Process a single message, from KPL - return [][]byte{record}, nil + // We handle three types of records: + // - records emitted from CWLogs Subscription (which are gzip compressed) + // - uncompressed records emitted from KPL + // - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit) + if splitter.IsGzipped(record) { + // Process a batch of messages from a CWLogs Subscription + return splitter.GetMessagesFromGzippedInput(record) } - // Process a batch of messages from a CWLogs Subscription - return splitter.GetMessagesFromGzippedInput(record) + // Try to read it as a zlib-compressed record + // zlib.NewReader checks for a zlib header and returns an error if not found + zlibReader, err := zlib.NewReader(bytes.NewReader(record)) + if err == nil { + unzlibRecord, err := ioutil.ReadAll(zlibReader) + if err != nil { + return nil, fmt.Errorf("reading zlib-compressed record: %v", err) + } + return [][]byte{unzlibRecord}, nil + } + // Process a single message, from KPL + return [][]byte{record}, nil + } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 8de24a9..7a79fac 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -1,7 +1,11 @@ package batchconsumer import ( + "bytes" + "compress/gzip" + "compress/zlib" "encoding/base64" + "encoding/json" "fmt" "testing" "time" @@ -10,6 +14,7 @@ import ( "gopkg.in/Clever/kayvee-go.v6/logger" "github.com/Clever/amazon-kinesis-client-go/kcl" + "github.com/Clever/amazon-kinesis-client-go/splitter" ) type ignoringSender struct{} @@ -352,7 +357,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { assert.Equal("tag3", string(mocksender.batches["tag3"][0][2])) } -func TestStaggeredCheckpionting(t *testing.T) { +func TestStaggeredCheckpointing(t *testing.T) { assert := assert.New(t) mockFailedLogsFile := logger.New("testing") @@ -421,3 +426,75 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) } + +func TestSplitIfNecesary(t *testing.T) { + + // We provide three different inputs to batchedWriter.splitMessageIfNecessary + // plain text + // zlib compressed text + // gzip compressed CloudWatch logs batch + // we verify that the split function matches the input against the correct splitter + // and decodes it. + + assert := assert.New(t) + + mockFailedLogsFile := logger.New("testing") + mockconfig := withDefaults(Config{ + BatchInterval: 10 * time.Millisecond, + CheckpointFreq: 20 * time.Millisecond, + }) + + wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mockFailedLogsFile) + + plainTextInput := []byte("hello, world!") + + records, err := wrt.splitMessageIfNecessary(plainTextInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + var z bytes.Buffer + zbuf := zlib.NewWriter(&z) + zbuf.Write([]byte("hello, world!")) + zbuf.Close() + zlibSingleInput := z.Bytes() + + records, err = wrt.splitMessageIfNecessary(zlibSingleInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{[]byte("hello, world!")}, + ) + + // the details of this part aren't super important since the actual functionality is + // tested in the splitter package, we just want to make sure that OUR split function + // correctly realizes it's gzip and call the splitter package's functionality + var g bytes.Buffer + gbuf := gzip.NewWriter(&g) + cwLogBatch := splitter.LogEventBatch{ + MessageType: "test", + Owner: "test", + LogGroup: "test", + LogStream: "test", + SubscriptionFilters: []string{""}, + LogEvents: []splitter.LogEvent{{ + ID: "test", + Timestamp: splitter.UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)), + Message: "test", + }}, + } + cwLogBatchJSON, _ := json.Marshal(cwLogBatch) + gbuf.Write(cwLogBatchJSON) + gbuf.Close() + gzipBatchInput := g.Bytes() + + expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test") + records, err = wrt.splitMessageIfNecessary(gzipBatchInput) + assert.NoError(err) + assert.Equal( + records, + [][]byte{expectedRecord}, + ) +}