From aeb8af04e824d8bf024f940458cd2527ff1cf104 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Mon, 25 Feb 2019 20:52:52 +0000 Subject: [PATCH] parse fargate logs --- splitter/splitter.go | 40 ++++++++++++++++++++++++++++++++++++--- splitter/splitter_test.go | 22 +++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index bd7407d..5d259a7 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -108,6 +108,11 @@ var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta) var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`) var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`) +// fargate log groups are of the form /ecs/-- +// fargate log streams are of the form fargate// +var awsFargateLogGroupRegex = regexp.MustCompile(`^/ecs/([a-z0-9-]+)--([a-z0-9-]+)$`) +var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0-9-]+)/([a-z0-9]+)$`) + // arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` const taskCruft = `12345678-1234-1234-1234-555566667777` @@ -178,6 +183,33 @@ func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { return out, true } +func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + + streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(streamMatches) != 1 { + return nil, false + } + ecsTaskID := streamMatches[0][3] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + ecsTaskID, + PID: 1, + Hostname: "aws-fargate", + Message: event.Message, + }) + } + return out, true +} + func splitDefault(b LogEventBatch) []RSysLogMessage { out := []RSysLogMessage{} for _, event := range b.LogEvents { @@ -199,9 +231,11 @@ func Split(b LogEventBatch) [][]byte { var rsyslogMsgs []RSysLogMessage var ok bool if rsyslogMsgs, ok = splitAWSLambda(b); !ok { - rsyslogMsgs, ok = splitAWSBatch(b) - if !ok { - rsyslogMsgs = splitDefault(b) + if rsyslogMsgs, ok = splitAWSFargate(b); !ok { + rsyslogMsgs, ok = splitAWSBatch(b) + if !ok { + rsyslogMsgs = splitDefault(b) + } } } diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index e6ae172..1436592 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -160,6 +160,28 @@ func TestSplitLambda(t *testing.T) { assert.Equal(t, expected, lines) } +func TestSplitFargate(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/ecs/production--clever-com-router", + LogStream: "fargate/clever-dev--clever-com-router/27b22d5d68aa4bd3923c95e7f32a3852", + SubscriptionFilters: []string{"ForwardLogsToKinesis"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "Starting haproxy: haproxy.", + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 aws-fargate production--clever-com-router/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F27b22d5d68aa4bd3923c95e7f32a3852[1]: Starting haproxy: haproxy.`), + } + assert.Equal(t, expected, lines) +} + func TestSplitDefault(t *testing.T) { input := LogEventBatch{ MessageType: "DATA_MESSAGE",