split slowquery logs
This commit is contained in:
parent
2af69f28a2
commit
45c48035c7
2 changed files with 72 additions and 20 deletions
|
|
@ -113,6 +113,9 @@ var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-
|
||||||
var awsFargateLogGroupRegex = regexp.MustCompile(`^/ecs/([a-z0-9-]+)--([a-z0-9-]+)$`)
|
var awsFargateLogGroupRegex = regexp.MustCompile(`^/ecs/([a-z0-9-]+)--([a-z0-9-]+)$`)
|
||||||
var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0-9-]+)/([a-z0-9]+)$`)
|
var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0-9-]+)/([a-z0-9]+)$`)
|
||||||
|
|
||||||
|
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
|
||||||
|
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
|
||||||
|
|
||||||
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
||||||
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
||||||
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
||||||
|
|
@ -132,11 +135,8 @@ func (r RSysLogMessage) String() string {
|
||||||
r.Hostname, r.ProgramName, r.Message)
|
r.Hostname, r.ProgramName, r.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) {
|
func splitAWSBatch(b LogEventBatch) []RSysLogMessage {
|
||||||
matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1)
|
matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1)
|
||||||
if len(matches) != 1 {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
env := matches[0][1]
|
env := matches[0][1]
|
||||||
app := matches[0][2]
|
app := matches[0][2]
|
||||||
task := matches[0][3]
|
task := matches[0][3]
|
||||||
|
|
@ -150,13 +150,13 @@ func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
Message: event.Message,
|
Message: event.Message,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, true
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) {
|
func splitAWSLambda(b LogEventBatch) []RSysLogMessage {
|
||||||
matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||||
if len(matches) != 1 {
|
if len(matches) != 1 {
|
||||||
return nil, false
|
return nil
|
||||||
}
|
}
|
||||||
env := matches[0][1]
|
env := matches[0][1]
|
||||||
app := matches[0][2]
|
app := matches[0][2]
|
||||||
|
|
@ -177,20 +177,20 @@ func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
Message: event.Message,
|
Message: event.Message,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, true
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) {
|
func splitAWSFargate(b LogEventBatch) []RSysLogMessage {
|
||||||
matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||||
if len(matches) != 1 {
|
if len(matches) != 1 {
|
||||||
return nil, false
|
return nil
|
||||||
}
|
}
|
||||||
env := matches[0][1]
|
env := matches[0][1]
|
||||||
app := matches[0][2]
|
app := matches[0][2]
|
||||||
|
|
||||||
streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
|
streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
|
||||||
if len(streamMatches) != 1 {
|
if len(streamMatches) != 1 {
|
||||||
return nil, false
|
return nil
|
||||||
}
|
}
|
||||||
ecsTaskID := streamMatches[0][3]
|
ecsTaskID := streamMatches[0][3]
|
||||||
|
|
||||||
|
|
@ -203,7 +203,27 @@ func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
Message: event.Message,
|
Message: event.Message,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, true
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitAWSRDS(b LogEventBatch) []RSysLogMessage {
|
||||||
|
matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||||
|
if len(matches) != 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
databaseName := matches[0][1]
|
||||||
|
|
||||||
|
out := []RSysLogMessage{}
|
||||||
|
for _, event := range b.LogEvents {
|
||||||
|
out = append(out, RSysLogMessage{
|
||||||
|
Timestamp: event.Timestamp.Time(),
|
||||||
|
ProgramName: databaseName,
|
||||||
|
PID: 1,
|
||||||
|
Hostname: "aws-rds",
|
||||||
|
Message: event.Message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||||
|
|
@ -224,14 +244,17 @@ func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||||
// the subsequent decoding logic.
|
// the subsequent decoding logic.
|
||||||
func Split(b LogEventBatch) [][]byte {
|
func Split(b LogEventBatch) [][]byte {
|
||||||
var rsyslogMsgs []RSysLogMessage
|
var rsyslogMsgs []RSysLogMessage
|
||||||
var ok bool
|
|
||||||
if rsyslogMsgs, ok = splitAWSLambda(b); !ok {
|
if awsLambdaLogGroupRegex.MatchString(b.LogGroup) {
|
||||||
if rsyslogMsgs, ok = splitAWSFargate(b); !ok {
|
rsyslogMsgs = splitAWSLambda(b)
|
||||||
rsyslogMsgs, ok = splitAWSBatch(b)
|
} else if awsFargateLogGroupRegex.MatchString(b.LogGroup) {
|
||||||
if !ok {
|
rsyslogMsgs = splitAWSFargate(b)
|
||||||
rsyslogMsgs = splitDefault(b)
|
} else if awsBatchTaskRegex.MatchString(b.LogStream) {
|
||||||
}
|
rsyslogMsgs = splitAWSBatch(b)
|
||||||
}
|
} else if awsRDSLogGroupRegex.MatchString(b.LogGroup) {
|
||||||
|
rsyslogMsgs = splitAWSRDS(b)
|
||||||
|
} else {
|
||||||
|
rsyslogMsgs = splitDefault(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
out := [][]byte{}
|
out := [][]byte{}
|
||||||
|
|
|
||||||
|
|
@ -224,3 +224,32 @@ func TestSplitDefault(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, lines)
|
assert.Equal(t, expected, lines)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitRDS(t *testing.T) {
|
||||||
|
input := LogEventBatch{
|
||||||
|
MessageType: "DATA_MESSAGE",
|
||||||
|
Owner: "123456789012",
|
||||||
|
LogGroup: "/aws/rds/cluster/production-aurora-test-db/slowquery",
|
||||||
|
LogStream: "clever-dev-aurora-test-db",
|
||||||
|
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
|
||||||
|
LogEvents: []LogEvent{
|
||||||
|
{
|
||||||
|
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||||
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
|
Message: "Slow query: select * from table.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
lines := Split(input)
|
||||||
|
expected := [][]byte{
|
||||||
|
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db[1]: Slow query: select * from table.`),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expected, lines)
|
||||||
|
for _, line := range expected {
|
||||||
|
enhanced, err := decode.ParseAndEnhance(string(line), "")
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, "aws-rds", enhanced["hostname"])
|
||||||
|
assert.Equal(t, "production-aurora-test-db", enhanced["programname"])
|
||||||
|
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue