diff --git a/decode/decode.go b/decode/decode.go index 7ab5040..071a9d0 100644 --- a/decode/decode.go +++ b/decode/decode.go @@ -448,7 +448,7 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) { const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app `arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft - `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32})` // task-id (ECS, both EC2 and Fargate) + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID var containerMetaRegex = regexp.MustCompile(containerMeta) diff --git a/splitter/splitter.go b/splitter/splitter.go index d8c93d6..c332253 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -116,6 +116,11 @@ var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0 // RDS slowquery log groups are in the form of /aws/rds/cluster//slowquery var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`) +// glue log groups are of the form /aws-glue/jobs// +// glue log streams are of the form -<"driver" | "1" | "progress-bar"> +var awsGlueLogGroupRegex = regexp.MustCompile(`^/aws-glue/jobs/([a-z0-9-]+)/([a-z0-9-]+)$`) +var awsGlueLogStreamRegex = regexp.MustCompile(`^(jr_[a-z0-9-]+)-.*$`) + // arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` const taskCruft = `12345678-1234-1234-1234-555566667777` @@ -228,6 +233,32 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) { return out, true } +func splitAWSGlue(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsGlueLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + + streamMatches := awsGlueLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(streamMatches) != 1 { + return nil, false + } + jobID := streamMatches[0][1] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + jobID, + Hostname: "aws-glue", + Message: event.Message, + }) + } + return out, true +} + func splitDefault(b LogEventBatch) []RSysLogMessage { out := []RSysLogMessage{} for _, event := range b.LogEvents { @@ -261,6 +292,8 @@ func Split(b LogEventBatch) [][]byte { return stringify(rsyslogMsgs) } else if rsyslogMsgs, ok := splitAWSRDS(b); ok { return stringify(rsyslogMsgs) + } else if rsyslogMsgs, ok := splitAWSGlue(b); ok { + return stringify(rsyslogMsgs) } return stringify(splitDefault(b)) } diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 6075230..5ce1f46 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -253,3 +253,33 @@ func TestSplitRDS(t *testing.T) { assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"]) } } + +func TestSplitGlue(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws-glue/jobs/clever-dev/analytics-district-participation", + LogStream: "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28-1", + SubscriptionFilters: []string{"ForwardLogsToKinesis"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "foo bar.", + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 aws-glue clever-dev--analytics-district-participation/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2Fjr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28: foo bar.`), + } + assert.Equal(t, expected, lines) + for _, line := range expected { + enhanced, err := decode.ParseAndEnhance(string(line), "") + require.Nil(t, err) + assert.Equal(t, "aws-glue", enhanced["hostname"]) + assert.Equal(t, "clever-dev", enhanced["container_env"]) + assert.Equal(t, "analytics-district-participation", enhanced["container_app"]) + assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"]) + } +}