From 2af69f28a2cc0e540bdf1740cbfc84ce7d2cdb54 Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Thu, 22 Aug 2019 17:02:22 -0700 Subject: [PATCH 1/6] update deps --- Gopkg.lock | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index efc1f45..894cbf2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -4,7 +4,10 @@ [[projects]] branch = "master" name = "github.com/Clever/syslogparser" - packages = [".","rfc3164"] + packages = [ + ".", + "rfc3164" + ] revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8" [[projects]] @@ -25,7 +28,10 @@ [[projects]] name = "github.com/stretchr/testify" - packages = ["assert"] + packages = [ + "assert", + "require" + ] revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" version = "v1.1.4" @@ -57,7 +63,11 @@ [[projects]] name = "gopkg.in/Clever/kayvee-go.v6" - packages = [".","logger","router"] + packages = [ + ".", + "logger", + "router" + ] revision = "096364e316a52652d3493be702d8105d8d01db84" version = "v6.6.0" @@ -69,6 +79,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "12add425987f6b506139be39923e5dababe3a0337b5d8f81bf5722c55c58b52e" + inputs-digest = "4699293f3632dd38561ff60477aa7cc1ecaadc5808b974d017099e2189679286" solver-name = "gps-cdcl" solver-version = 1 From 45c48035c71cfcb99f39e3d82be3176f19e95151 Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Thu, 22 Aug 2019 17:02:46 -0700 Subject: [PATCH 2/6] split slowquery logs --- splitter/splitter.go | 63 ++++++++++++++++++++++++++------------- splitter/splitter_test.go | 29 ++++++++++++++++++ 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index ba94964..ee741ad 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -113,6 +113,9 @@ var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a- var awsFargateLogGroupRegex = regexp.MustCompile(`^/ecs/([a-z0-9-]+)--([a-z0-9-]+)$`) var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0-9-]+)/([a-z0-9]+)$`) +// RDS slowquery log groups are in the form of /aws/rds/cluster//slowquery +var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`) + // arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` const taskCruft = `12345678-1234-1234-1234-555566667777` @@ -132,11 +135,8 @@ func (r RSysLogMessage) String() string { r.Hostname, r.ProgramName, r.Message) } -func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) { +func splitAWSBatch(b LogEventBatch) []RSysLogMessage { matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) - if len(matches) != 1 { - return nil, false - } env := matches[0][1] app := matches[0][2] task := matches[0][3] @@ -150,13 +150,13 @@ func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) { Message: event.Message, }) } - return out, true + return out } -func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { +func splitAWSLambda(b LogEventBatch) []RSysLogMessage { matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) if len(matches) != 1 { - return nil, false + return nil } env := matches[0][1] app := matches[0][2] @@ -177,20 +177,20 @@ func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { Message: event.Message, }) } - return out, true + return out } -func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) { +func splitAWSFargate(b LogEventBatch) []RSysLogMessage { matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) if len(matches) != 1 { - return nil, false + return nil } env := matches[0][1] app := matches[0][2] streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1) if len(streamMatches) != 1 { - return nil, false + return nil } ecsTaskID := streamMatches[0][3] @@ -203,7 +203,27 @@ func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) { Message: event.Message, }) } - return out, true + return out +} + +func splitAWSRDS(b LogEventBatch) []RSysLogMessage { + matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) + if len(matches) != 1 { + return nil + } + databaseName := matches[0][1] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: databaseName, + PID: 1, + Hostname: "aws-rds", + Message: event.Message, + }) + } + return out } func splitDefault(b LogEventBatch) []RSysLogMessage { @@ -224,14 +244,17 @@ func splitDefault(b LogEventBatch) []RSysLogMessage { // the subsequent decoding logic. func Split(b LogEventBatch) [][]byte { var rsyslogMsgs []RSysLogMessage - var ok bool - if rsyslogMsgs, ok = splitAWSLambda(b); !ok { - if rsyslogMsgs, ok = splitAWSFargate(b); !ok { - rsyslogMsgs, ok = splitAWSBatch(b) - if !ok { - rsyslogMsgs = splitDefault(b) - } - } + + if awsLambdaLogGroupRegex.MatchString(b.LogGroup) { + rsyslogMsgs = splitAWSLambda(b) + } else if awsFargateLogGroupRegex.MatchString(b.LogGroup) { + rsyslogMsgs = splitAWSFargate(b) + } else if awsBatchTaskRegex.MatchString(b.LogStream) { + rsyslogMsgs = splitAWSBatch(b) + } else if awsRDSLogGroupRegex.MatchString(b.LogGroup) { + rsyslogMsgs = splitAWSRDS(b) + } else { + rsyslogMsgs = splitDefault(b) } out := [][]byte{} diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index a087325..c903931 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -224,3 +224,32 @@ func TestSplitDefault(t *testing.T) { } assert.Equal(t, expected, lines) } + +func TestSplitRDS(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/rds/cluster/production-aurora-test-db/slowquery", + LogStream: "clever-dev-aurora-test-db", + SubscriptionFilters: []string{"ForwardLogsToKinesis"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "Slow query: select * from table.", + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db[1]: Slow query: select * from table.`), + } + assert.Equal(t, expected, lines) + for _, line := range expected { + enhanced, err := decode.ParseAndEnhance(string(line), "") + require.Nil(t, err) + assert.Equal(t, "aws-rds", enhanced["hostname"]) + assert.Equal(t, "production-aurora-test-db", enhanced["programname"]) + assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"]) + } +} From df8b175e8645720f64a7f2fe6375fb1748b148cf Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Fri, 23 Aug 2019 13:28:00 -0700 Subject: [PATCH 3/6] add decode test --- decode/decode_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/decode/decode_test.go b/decode/decode_test.go index 12136c0..301a4e3 100644 --- a/decode/decode_test.go +++ b/decode/decode_test.go @@ -196,6 +196,18 @@ func TestSyslogDecoding(t *testing.T) { }, ExpectedError: nil, }, + Spec{ + Title: "Parses Rsyslog_ FileFormat with simple log body for slowquery", + Input: `2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime, + "hostname": "aws-rds", + "programname": "production-aurora-test-db", + "rawlog": "Slow query: select * from table.", + "decoder_msg_type": "syslog", + }, + ExpectedError: nil, + }, Spec{ Title: "Fails to parse non-RSyslog log line", Input: `not rsyslog`, From ea59ce899b3ba6a13edd2b869b6d218d4807c9d0 Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Fri, 23 Aug 2019 13:28:14 -0700 Subject: [PATCH 4/6] add match check --- splitter/splitter.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/splitter/splitter.go b/splitter/splitter.go index ee741ad..2410cd9 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -137,6 +137,9 @@ func (r RSysLogMessage) String() string { func splitAWSBatch(b LogEventBatch) []RSysLogMessage { matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(matches) != 1 { + return nil + } env := matches[0][1] app := matches[0][2] task := matches[0][3] From 1261000e3384dafe57e32619e39f7f8ebd2df68b Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Fri, 23 Aug 2019 14:03:26 -0700 Subject: [PATCH 5/6] re-refactor log classification logic --- decode/decode_test.go | 4 +-- splitter/splitter.go | 62 +++++++++++++++++++++---------------------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/decode/decode_test.go b/decode/decode_test.go index 301a4e3..7f201a9 100644 --- a/decode/decode_test.go +++ b/decode/decode_test.go @@ -198,9 +198,9 @@ func TestSyslogDecoding(t *testing.T) { }, Spec{ Title: "Parses Rsyslog_ FileFormat with simple log body for slowquery", - Input: `2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`, + Input: `2017-04-05T21:57:46.794862+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`, ExpectedOutput: map[string]interface{}{ - "timestamp": logTime, + "timestamp": logTime3, "hostname": "aws-rds", "programname": "production-aurora-test-db", "rawlog": "Slow query: select * from table.", diff --git a/splitter/splitter.go b/splitter/splitter.go index 2410cd9..7880318 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -135,10 +135,10 @@ func (r RSysLogMessage) String() string { r.Hostname, r.ProgramName, r.Message) } -func splitAWSBatch(b LogEventBatch) []RSysLogMessage { +func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) { matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) if len(matches) != 1 { - return nil + return nil, false } env := matches[0][1] app := matches[0][2] @@ -153,13 +153,13 @@ func splitAWSBatch(b LogEventBatch) []RSysLogMessage { Message: event.Message, }) } - return out + return out, true } -func splitAWSLambda(b LogEventBatch) []RSysLogMessage { +func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) if len(matches) != 1 { - return nil + return nil, false } env := matches[0][1] app := matches[0][2] @@ -180,20 +180,20 @@ func splitAWSLambda(b LogEventBatch) []RSysLogMessage { Message: event.Message, }) } - return out + return out, true } -func splitAWSFargate(b LogEventBatch) []RSysLogMessage { +func splitAWSFargate(b LogEventBatch) ([]RSysLogMessage, bool) { matches := awsFargateLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) if len(matches) != 1 { - return nil + return nil, false } env := matches[0][1] app := matches[0][2] streamMatches := awsFargateLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1) if len(streamMatches) != 1 { - return nil + return nil, false } ecsTaskID := streamMatches[0][3] @@ -206,13 +206,13 @@ func splitAWSFargate(b LogEventBatch) []RSysLogMessage { Message: event.Message, }) } - return out + return out, true } -func splitAWSRDS(b LogEventBatch) []RSysLogMessage { +func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) { matches := awsRDSLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) if len(matches) != 1 { - return nil + return nil, false } databaseName := matches[0][1] @@ -226,7 +226,7 @@ func splitAWSRDS(b LogEventBatch) []RSysLogMessage { Message: event.Message, }) } - return out + return out, true } func splitDefault(b LogEventBatch) []RSysLogMessage { @@ -242,28 +242,26 @@ func splitDefault(b LogEventBatch) []RSysLogMessage { return out } +func stringify(rsyslogs []RSysLogMessage) [][]byte { + out := make([][]byte, len(rsyslogs)) + for i := range rsyslogs { + out[i] = []byte(rsyslogs[i].String()) + } + return out +} + // Split takes a LogEventBatch and separates into a slice of enriched log lines // Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by // the subsequent decoding logic. func Split(b LogEventBatch) [][]byte { - var rsyslogMsgs []RSysLogMessage - - if awsLambdaLogGroupRegex.MatchString(b.LogGroup) { - rsyslogMsgs = splitAWSLambda(b) - } else if awsFargateLogGroupRegex.MatchString(b.LogGroup) { - rsyslogMsgs = splitAWSFargate(b) - } else if awsBatchTaskRegex.MatchString(b.LogStream) { - rsyslogMsgs = splitAWSBatch(b) - } else if awsRDSLogGroupRegex.MatchString(b.LogGroup) { - rsyslogMsgs = splitAWSRDS(b) - } else { - rsyslogMsgs = splitDefault(b) + if rsyslogMsgs, ok := splitAWSLambda(b); ok { + return stringify(rsyslogMsgs) + } else if rsyslogMsgs, ok := splitAWSFargate(b); ok { + return stringify(rsyslogMsgs) + } else if rsyslogMsgs, ok := splitAWSBatch(b); ok { + return stringify(rsyslogMsgs) + } else if rsyslogMsgs, ok := splitAWSRDS(b); ok { + return stringify(rsyslogMsgs) } - - out := [][]byte{} - for _, rsyslogMsg := range rsyslogMsgs { - out = append(out, []byte(rsyslogMsg.String())) - } - - return out + return stringify(splitDefault(b)) } From 598d57359ff2c6994acc196e311b90c729b0082d Mon Sep 17 00:00:00 2001 From: Aaron Stein Date: Mon, 26 Aug 2019 12:09:06 -0700 Subject: [PATCH 6/6] remove PID from slowquery log --- splitter/splitter.go | 1 - splitter/splitter_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index 7880318..d8c93d6 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -221,7 +221,6 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) { out = append(out, RSysLogMessage{ Timestamp: event.Timestamp.Time(), ProgramName: databaseName, - PID: 1, Hostname: "aws-rds", Message: event.Message, }) diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index c903931..6075230 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -242,7 +242,7 @@ func TestSplitRDS(t *testing.T) { } lines := Split(input) expected := [][]byte{ - []byte(`2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db[1]: Slow query: select * from table.`), + []byte(`2017-06-26T23:32:23.285001+00:00 aws-rds production-aurora-test-db: Slow query: select * from table.`), } assert.Equal(t, expected, lines) for _, line := range expected {