INFRA-2885: Split slowquery logs - Merge pull request #42 from Clever/INFRA-2885
INFRA-2885: Split slowquery logs
This commit is contained in:
commit
07828aef7b
4 changed files with 94 additions and 20 deletions
18
Gopkg.lock
generated
18
Gopkg.lock
generated
|
|
@ -4,7 +4,10 @@
|
|||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/Clever/syslogparser"
|
||||
packages = [".","rfc3164"]
|
||||
packages = [
|
||||
".",
|
||||
"rfc3164"
|
||||
]
|
||||
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
|
||||
|
||||
[[projects]]
|
||||
|
|
@ -25,7 +28,10 @@
|
|||
|
||||
[[projects]]
|
||||
name = "github.com/stretchr/testify"
|
||||
packages = ["assert"]
|
||||
packages = [
|
||||
"assert",
|
||||
"require"
|
||||
]
|
||||
revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0"
|
||||
version = "v1.1.4"
|
||||
|
||||
|
|
@ -57,7 +63,11 @@
|
|||
|
||||
[[projects]]
|
||||
name = "gopkg.in/Clever/kayvee-go.v6"
|
||||
packages = [".","logger","router"]
|
||||
packages = [
|
||||
".",
|
||||
"logger",
|
||||
"router"
|
||||
]
|
||||
revision = "096364e316a52652d3493be702d8105d8d01db84"
|
||||
version = "v6.6.0"
|
||||
|
||||
|
|
@ -69,6 +79,6 @@
|
|||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "12add425987f6b506139be39923e5dababe3a0337b5d8f81bf5722c55c58b52e"
|
||||
inputs-digest = "4699293f3632dd38561ff60477aa7cc1ecaadc5808b974d017099e2189679286"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
|
|||
|
|
@ -196,6 +196,18 @@ func TestSyslogDecoding(t *testing.T) {
|
|||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
Spec{
|
||||
Title: "Parses Rsyslog_ FileFormat with simple log body for slowquery",
|
||||
Input: `2017-04-05T21:57:46.794862+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime3,
|
||||
"hostname": "aws-rds",
|
||||
"programname": "production-aurora-test-db",
|
||||
"rawlog": "Slow query: select * from table.",
|
||||
"decoder_msg_type": "syslog",
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
Spec{
|
||||
Title: "Fails to parse non-RSyslog log line",
|
||||
Input: `not rsyslog`,
|
||||
|
|
|
|||
|
|
@ -113,6 +113,9 @@ var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-
|
|||
var awsFargateLogGroupRegex = regexp.MustCompile(`^/ecs/([a-z0-9-]+)--([a-z0-9-]+)$`)
|
||||
var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0-9-]+)/([a-z0-9]+)$`)
|
||||
|
||||
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
|
||||
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
|
||||
|
||||
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
||||
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
||||
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
||||
|
|
@ -206,6 +209,25 @@ func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) {
|
|||
return out, true
|
||||
}
|
||||
|
||||
func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||
matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||
if len(matches) != 1 {
|
||||
return nil, false
|
||||
}
|
||||
databaseName := matches[0][1]
|
||||
|
||||
out := []RSysLogMessage{}
|
||||
for _, event := range b.LogEvents {
|
||||
out = append(out, RSysLogMessage{
|
||||
Timestamp: event.Timestamp.Time(),
|
||||
ProgramName: databaseName,
|
||||
Hostname: "aws-rds",
|
||||
Message: event.Message,
|
||||
})
|
||||
}
|
||||
return out, true
|
||||
}
|
||||
|
||||
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||
out := []RSysLogMessage{}
|
||||
for _, event := range b.LogEvents {
|
||||
|
|
@ -219,25 +241,26 @@ func splitDefault(b LogEventBatch) []RSysLogMessage {
|
|||
return out
|
||||
}
|
||||
|
||||
func stringify(rsyslogs []RSysLogMessage) [][]byte {
|
||||
out := make([][]byte, len(rsyslogs))
|
||||
for i := range rsyslogs {
|
||||
out[i] = []byte(rsyslogs[i].String())
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Split takes a LogEventBatch and separates into a slice of enriched log lines
|
||||
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
|
||||
// the subsequent decoding logic.
|
||||
func Split(b LogEventBatch) [][]byte {
|
||||
var rsyslogMsgs []RSysLogMessage
|
||||
var ok bool
|
||||
if rsyslogMsgs, ok = splitAWSLambda(b); !ok {
|
||||
if rsyslogMsgs, ok = splitAWSFargate(b); !ok {
|
||||
rsyslogMsgs, ok = splitAWSBatch(b)
|
||||
if !ok {
|
||||
rsyslogMsgs = splitDefault(b)
|
||||
if rsyslogMsgs, ok := splitAWSLambda(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
} else if rsyslogMsgs, ok := splitAWSFargate(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
} else if rsyslogMsgs, ok := splitAWSBatch(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
out := [][]byte{}
|
||||
for _, rsyslogMsg := range rsyslogMsgs {
|
||||
out = append(out, []byte(rsyslogMsg.String()))
|
||||
}
|
||||
|
||||
return out
|
||||
return stringify(splitDefault(b))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -224,3 +224,32 @@ func TestSplitDefault(t *testing.T) {
|
|||
}
|
||||
assert.Equal(t, expected, lines)
|
||||
}
|
||||
|
||||
func TestSplitRDS(t *testing.T) {
|
||||
input := LogEventBatch{
|
||||
MessageType: "DATA_MESSAGE",
|
||||
Owner: "123456789012",
|
||||
LogGroup: "/aws/rds/cluster/production-aurora-test-db/slowquery",
|
||||
LogStream: "clever-dev-aurora-test-db",
|
||||
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
|
||||
LogEvents: []LogEvent{
|
||||
{
|
||||
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||
Message: "Slow query: select * from table.",
|
||||
},
|
||||
},
|
||||
}
|
||||
lines := Split(input)
|
||||
expected := [][]byte{
|
||||
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`),
|
||||
}
|
||||
assert.Equal(t, expected, lines)
|
||||
for _, line := range expected {
|
||||
enhanced, err := decode.ParseAndEnhance(string(line), "")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, "aws-rds", enhanced["hostname"])
|
||||
assert.Equal(t, "production-aurora-test-db", enhanced["programname"])
|
||||
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue