re-refactor log classification logic

This commit is contained in:
Aaron Stein 2019-08-23 14:03:26 -07:00
parent ea59ce899b
commit 1261000e33
2 changed files with 32 additions and 34 deletions

View file

@ -198,9 +198,9 @@ func TestSyslogDecoding(t *testing.T) {
}, },
Spec{ Spec{
Title: "Parses Rsyslog_ FileFormat with simple log body for slowquery", Title: "Parses Rsyslog_ FileFormat with simple log body for slowquery",
Input: `2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`, Input: `2017-04-05T21:57:46.794862+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`,
ExpectedOutput: map[string]interface{}{ ExpectedOutput: map[string]interface{}{
"timestamp": logTime, "timestamp": logTime3,
"hostname": "aws-rds", "hostname": "aws-rds",
"programname": "production-aurora-test-db", "programname": "production-aurora-test-db",
"rawlog": "Slow query: select * from table.", "rawlog": "Slow query: select * from table.",

View file

@ -135,10 +135,10 @@ func (r RSysLogMessage) String() string {
r.Hostname, r.ProgramName, r.Message) r.Hostname, r.ProgramName, r.Message)
} }
func splitAWSBatch(b LogEventBatch) []RSysLogMessage { func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) != 1 { if len(matches) != 1 {
return nil return nil, false
} }
env := matches[0][1] env := matches[0][1]
app := matches[0][2] app := matches[0][2]
@ -153,13 +153,13 @@ func splitAWSBatch(b LogEventBatch) []RSysLogMessage {
Message: event.Message, Message: event.Message,
}) })
} }
return out return out, true
} }
func splitAWSLambda(b LogEventBatch) []RSysLogMessage { func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 { if len(matches) != 1 {
return nil return nil, false
} }
env := matches[0][1] env := matches[0][1]
app := matches[0][2] app := matches[0][2]
@ -180,20 +180,20 @@ func splitAWSLambda(b LogEventBatch) []RSysLogMessage {
Message: event.Message, Message: event.Message,
}) })
} }
return out return out, true
} }
func splitAWSFargate(b LogEventBatch) []RSysLogMessage { func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 { if len(matches) != 1 {
return nil return nil, false
} }
env := matches[0][1] env := matches[0][1]
app := matches[0][2] app := matches[0][2]
streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1) streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(streamMatches) != 1 { if len(streamMatches) != 1 {
return nil return nil, false
} }
ecsTaskID := streamMatches[0][3] ecsTaskID := streamMatches[0][3]
@ -206,13 +206,13 @@ func splitAWSFargate(b LogEventBatch) []RSysLogMessage {
Message: event.Message, Message: event.Message,
}) })
} }
return out return out, true
} }
func splitAWSRDS(b LogEventBatch) []RSysLogMessage { func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 { if len(matches) != 1 {
return nil return nil, false
} }
databaseName := matches[0][1] databaseName := matches[0][1]
@ -226,7 +226,7 @@ func splitAWSRDS(b LogEventBatch) []RSysLogMessage {
Message: event.Message, Message: event.Message,
}) })
} }
return out return out, true
} }
func splitDefault(b LogEventBatch) []RSysLogMessage { func splitDefault(b LogEventBatch) []RSysLogMessage {
@ -242,28 +242,26 @@ func splitDefault(b LogEventBatch) []RSysLogMessage {
return out return out
} }
func stringify(rsyslogs []RSysLogMessage) [][]byte {
out := make([][]byte, len(rsyslogs))
for i := range rsyslogs {
out[i] = []byte(rsyslogs[i].String())
}
return out
}
// Split takes a LogEventBatch and separates into a slice of enriched log lines // Split takes a LogEventBatch and separates into a slice of enriched log lines
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by // Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
// the subsequent decoding logic. // the subsequent decoding logic.
func Split(b LogEventBatch) [][]byte { func Split(b LogEventBatch) [][]byte {
var rsyslogMsgs []RSysLogMessage if rsyslogMsgs, ok := splitAWSLambda(b); ok {
return stringify(rsyslogMsgs)
if awsLambdaLogGroupRegex.MatchString(b.LogGroup) { } else if rsyslogMsgs, ok := splitAWSFargate(b); ok {
rsyslogMsgs = splitAWSLambda(b) return stringify(rsyslogMsgs)
} else if awsFargateLogGroupRegex.MatchString(b.LogGroup) { } else if rsyslogMsgs, ok := splitAWSBatch(b); ok {
rsyslogMsgs = splitAWSFargate(b) return stringify(rsyslogMsgs)
} else if awsBatchTaskRegex.MatchString(b.LogStream) { } else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
rsyslogMsgs = splitAWSBatch(b) return stringify(rsyslogMsgs)
} else if awsRDSLogGroupRegex.MatchString(b.LogGroup) {
rsyslogMsgs = splitAWSRDS(b)
} else {
rsyslogMsgs = splitDefault(b)
} }
return stringify(splitDefault(b))
out := [][]byte{}
for _, rsyslogMsg := range rsyslogMsgs {
out = append(out, []byte(rsyslogMsg.String()))
}
return out
} }