support parsing AWS Glue job logs

This commit is contained in:
Rafael Garcia 2020-07-20 17:20:39 +00:00
parent eb11747434
commit 337d2063f5
3 changed files with 64 additions and 1 deletions

View file

@ -448,7 +448,7 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft `arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32})` // task-id (ECS, both EC2 and Fargate) `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID
var containerMetaRegex = regexp.MustCompile(containerMeta) var containerMetaRegex = regexp.MustCompile(containerMeta)

View file

@ -116,6 +116,11 @@ var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery // RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`) var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
// glue log groups are of the form /aws-glue/jobs/<env>/<app>
// glue log streams are of the form <job id>-<"driver" | "1" | "progress-bar">
var awsGlueLogGroupRegex = regexp.MustCompile(`^/aws-glue/jobs/([a-z0-9-]+)/([a-z0-9-]+)$`)
var awsGlueLogStreamRegex = regexp.MustCompile(`^(jr_[a-z0-9-]+)-.*$`)
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 // arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
const taskCruft = `12345678-1234-1234-1234-555566667777` const taskCruft = `12345678-1234-1234-1234-555566667777`
@ -228,6 +233,32 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
return out, true return out, true
} }
func splitAWSGlue(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsGlueLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 {
return nil, false
}
env := matches[0][1]
app := matches[0][2]
streamMatches := awsGlueLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(streamMatches) != 1 {
return nil, false
}
jobID := streamMatches[0][1]
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
out = append(out, RSysLogMessage{
Timestamp: event.Timestamp.Time(),
ProgramName: env + "--" + app + arnCruft + jobID,
Hostname: "aws-glue",
Message: event.Message,
})
}
return out, true
}
func splitDefault(b LogEventBatch) []RSysLogMessage { func splitDefault(b LogEventBatch) []RSysLogMessage {
out := []RSysLogMessage{} out := []RSysLogMessage{}
for _, event := range b.LogEvents { for _, event := range b.LogEvents {
@ -261,6 +292,8 @@ func Split(b LogEventBatch) [][]byte {
return stringify(rsyslogMsgs) return stringify(rsyslogMsgs)
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok { } else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
return stringify(rsyslogMsgs) return stringify(rsyslogMsgs)
} else if rsyslogMsgs, ok := splitAWSGlue(b); ok {
return stringify(rsyslogMsgs)
} }
return stringify(splitDefault(b)) return stringify(splitDefault(b))
} }

View file

@ -253,3 +253,33 @@ func TestSplitRDS(t *testing.T) {
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"]) assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
} }
} }
func TestSplitGlue(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws-glue/jobs/clever-dev/analytics-district-participation",
LogStream: "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28-1",
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "foo bar.",
},
},
}
lines := Split(input)
expected := [][]byte{
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-glue clever-dev--analytics-district-participation/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2Fjr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28: foo bar.`),
}
assert.Equal(t, expected, lines)
for _, line := range expected {
enhanced, err := decode.ParseAndEnhance(string(line), "")
require.Nil(t, err)
assert.Equal(t, "aws-glue", enhanced["hostname"])
assert.Equal(t, "clever-dev", enhanced["container_env"])
assert.Equal(t, "analytics-district-participation", enhanced["container_app"])
assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"])
}
}