Add decoding of JSON logs in Fluent format.
ParseAndEnhance used to be:
- Try to parse line as a syslog, extracting the log itself and other
fields from syslog format
- If that succeeds, try to parse the log as either a Kayvee log or
an RDS slow query log.
- Combine all these fields, and add on some "derived"
fields (container_task|env|app).
- Not a syslog => error
Now it will be:
- Try to parse line as a syslog, same as before, including the
Kayvee/RDS part
- If syslog parsing failed, try to parse as a Fluent log and extract
some fields from the Fluent format (the log, timestamp, etc)
- If that succeeds, try to parse the log itself as a Kayvee log.
- Combine Kayvee fields (if found) and derived fields)
- If BOTH formats fields, it is an error.
The decoding makes a lot of assumptions:
- The names of the log field and timestamp field (even though,
theoretically, they are customizable in the fluentbit config.
- The timestamp format (again)
- The format of the Task Definition name (or at least part of it)
- All fluentbit logs should have hostname set to `aws-fargate`.
Perhaps these can be relaxed if necessary. They could probably be
replaced by some kind of config. As there is currently no config I
wanted to keep things simple as possible. If we need to re-evaluate
(for example if we start getting JSON logs that don't want to use the
same handling for container_task|env|app) we can reevaluate.
This commit is contained in:
parent
b16af062fb
commit
f5ce6fe4e7
2 changed files with 407 additions and 115 deletions
203
decode/decode.go
203
decode/decode.go
|
|
@ -2,9 +2,11 @@ package decode
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Clever/syslogparser/rfc3164"
|
||||
)
|
||||
|
|
@ -15,9 +17,18 @@ var reservedFields = []string{
|
|||
"prefix",
|
||||
"postfix",
|
||||
"decoder_msg_type",
|
||||
|
||||
// used by all logs
|
||||
"timestamp",
|
||||
"hostname",
|
||||
"rawlog",
|
||||
|
||||
// Used by Kayvee
|
||||
"prefix",
|
||||
"postfix",
|
||||
|
||||
// Set to the deepest step of parsing that succeeded
|
||||
"decoder_msg_type",
|
||||
}
|
||||
|
||||
func stringInSlice(s string, slice []string) bool {
|
||||
|
|
@ -385,17 +396,21 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
|
|||
|
||||
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
|
||||
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
||||
out := map[string]interface{}{}
|
||||
syslog, syslogErr := FieldsFromSyslog(line)
|
||||
if syslogErr == nil {
|
||||
return decodeSyslog(syslog, env)
|
||||
}
|
||||
|
||||
syslogFields, err := FieldsFromSyslog(line)
|
||||
if err != nil {
|
||||
return map[string]interface{}{}, err
|
||||
fluentLog, fluentErr := FieldsFromFluentbitLog(line)
|
||||
if fluentErr == nil {
|
||||
return decodeFluent(fluentLog, env)
|
||||
}
|
||||
for k, v := range syslogFields {
|
||||
out[k] = v
|
||||
}
|
||||
rawlog := syslogFields["rawlog"].(string)
|
||||
programname := syslogFields["programname"].(string)
|
||||
|
||||
return nil, fmt.Errorf("unable to parse log line with errors `%v` and `%v`", syslogErr, fluentErr)
|
||||
}
|
||||
|
||||
func decodeSyslog(syslog map[string]interface{}, env string) (map[string]interface{}, error) {
|
||||
rawlog := syslog["rawlog"].(string)
|
||||
|
||||
// Try pulling Kayvee fields out of message
|
||||
kvFields, err := FieldsFromKayvee(rawlog)
|
||||
|
|
@ -405,45 +420,25 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
|||
}
|
||||
} else {
|
||||
for k, v := range kvFields {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Inject additional fields that are useful in log-searching and other business logic
|
||||
out["env"] = env
|
||||
|
||||
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
||||
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
||||
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
||||
// docker-events app.
|
||||
forceEnv := ""
|
||||
forceApp := ""
|
||||
forceTask := ""
|
||||
if cEnv, ok := out["container_env"]; ok {
|
||||
forceEnv = cEnv.(string)
|
||||
}
|
||||
if cApp, ok := out["container_app"]; ok {
|
||||
forceApp = cApp.(string)
|
||||
}
|
||||
if cTask, ok := out["container_task"]; ok {
|
||||
forceTask = cTask.(string)
|
||||
}
|
||||
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
|
||||
if err == nil {
|
||||
for k, v := range meta {
|
||||
out[k] = v
|
||||
syslog[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Try pulling RDS slowquery logs fields out of message
|
||||
if out["hostname"] == "aws-rds" {
|
||||
if syslog["hostname"] == "aws-rds" {
|
||||
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
|
||||
for k, v := range slowQueryFields {
|
||||
out[k] = v
|
||||
syslog[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
// Inject additional fields that are useful in log-searching and other business logic
|
||||
syslog["env"] = env
|
||||
|
||||
// this can error, which indicates inability to extract container meta. That's fine, we can ignore that.
|
||||
addContainterMetaToSyslog(syslog)
|
||||
|
||||
return syslog, nil
|
||||
}
|
||||
|
||||
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
||||
|
|
@ -452,9 +447,12 @@ const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
|||
|
||||
var containerMetaRegex = regexp.MustCompile(containerMeta)
|
||||
|
||||
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
|
||||
if programname == "" {
|
||||
return map[string]string{}, fmt.Errorf("no programname")
|
||||
var errBadProgramname = errors.New("invalid or missing programname")
|
||||
|
||||
func addContainterMetaToSyslog(syslog map[string]interface{}) (map[string]interface{}, error) {
|
||||
programname, ok := syslog["programname"].(string)
|
||||
if !ok || programname == "" {
|
||||
return syslog, errBadProgramname
|
||||
}
|
||||
|
||||
env := ""
|
||||
|
|
@ -465,25 +463,116 @@ func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[st
|
|||
env = matches[0][1]
|
||||
app = matches[0][2]
|
||||
task = matches[0][4]
|
||||
} else {
|
||||
return syslog, errBadProgramname
|
||||
}
|
||||
|
||||
if forceEnv != "" {
|
||||
env = forceEnv
|
||||
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
||||
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
||||
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
||||
// docker-events app.
|
||||
if existingEnv, ok := syslog["container_env"].(string); !ok || existingEnv == "" {
|
||||
syslog["container_env"] = env
|
||||
}
|
||||
if forceApp != "" {
|
||||
app = forceApp
|
||||
if existingApp, ok := syslog["container_app"].(string); !ok || existingApp == "" {
|
||||
syslog["container_app"] = app
|
||||
}
|
||||
if forceTask != "" {
|
||||
task = forceTask
|
||||
if existingTask, ok := syslog["container_task"].(string); !ok || existingTask == "" {
|
||||
syslog["container_task"] = task
|
||||
}
|
||||
|
||||
if env == "" || app == "" || task == "" {
|
||||
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
"container_env": env,
|
||||
"container_app": app,
|
||||
"container_task": task,
|
||||
}, nil
|
||||
return syslog, nil
|
||||
}
|
||||
|
||||
const (
|
||||
// These are the fields in the incoming fluentbit JSON that we expect the timestamp and log
|
||||
// They are referenced below by the FluentLog type; those are the ones that matter.
|
||||
fluentbitTimestampField = "fluent_ts"
|
||||
fluentbitLogField = "log"
|
||||
|
||||
// This is what we get from the fluentbig config: `time_key_format: "%Y-%m-%dT%H:%M:%S.%L%z"`
|
||||
fluentTimeFormat = "2006-01-02T15:04:05.999-0700"
|
||||
)
|
||||
|
||||
// FluentLog represents is the set of fields extracted from an incoming fluentbit log
|
||||
// The struct tags must line up with the JSON schema in the fluentbit configuration, see the comment for FieldsFromFluentBitlog
|
||||
type FluentLog struct {
|
||||
Log *string `json:"log"`
|
||||
Timestamp *string `json:"fluent_ts"`
|
||||
TaskArn string `json:"ecs_task_arn"`
|
||||
TaskDefinition string `json:"ecs_task_definition"`
|
||||
}
|
||||
|
||||
// FieldsFromFluentbitLog parses JSON object with fields indicating that it's coming from FluentBit
|
||||
// Its return value shares a common interface with the Syslog output - with the same four key fields
|
||||
// Unlike FieldsFromSyslog, it accepts its argument as bytes, since it will be used as bytes immediately and bytes is what comes off the firehose
|
||||
// In theory, the format we recieve is highly customizable, so we'll be making the following assumptions:
|
||||
// - All logs are coming from aws-fargate with the ecs-metadata fields (ecs_cluster, ecs_task_arn, ecs_task_definition) enabled
|
||||
// - The timestamp is in the field given by the FluentBitTimestampField constant in this package
|
||||
// - The timestamp is of the format of the constant fluentTimestampFormat
|
||||
// - The log is in the field given by the FluentBitlogField constant in this package
|
||||
// - The ecs_task_definition is of the from {environment}--{application}--.*
|
||||
func FieldsFromFluentbitLog(line string) (*FluentLog, error) {
|
||||
fluentFields := FluentLog{}
|
||||
if err := json.Unmarshal([]byte(line), &fluentFields); err != nil {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: err}
|
||||
}
|
||||
|
||||
return &fluentFields, nil
|
||||
}
|
||||
|
||||
func decodeFluent(fluentLog *FluentLog, env string) (map[string]interface{}, error) {
|
||||
out := map[string]interface{}{}
|
||||
if fluentLog.Timestamp == nil || *fluentLog.Timestamp == "" {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no timestamp found in input field %s", fluentbitTimestampField)}
|
||||
}
|
||||
|
||||
if timeValue, err := time.Parse(fluentTimeFormat, *fluentLog.Timestamp); err == nil {
|
||||
out["timestamp"] = timeValue
|
||||
} else {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("timestamp has bad format: %s", *fluentLog.Timestamp)}
|
||||
}
|
||||
|
||||
if fluentLog.Log == nil {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no log found in input field %s", fluentbitLogField)}
|
||||
}
|
||||
log := *fluentLog.Log
|
||||
out["rawlog"] = log
|
||||
|
||||
out["decoder_msg_type"] = "fluentbit"
|
||||
out["hostname"] = "aws-fargate"
|
||||
|
||||
// best effort to add container_env|app|task
|
||||
if parts := strings.SplitN(fluentLog.TaskDefinition, "--", 3); len(parts) == 3 {
|
||||
out["container_env"] = parts[0]
|
||||
out["container_app"] = parts[1]
|
||||
}
|
||||
if idx := strings.LastIndex(fluentLog.TaskArn, "/"); idx != -1 {
|
||||
out["container_task"] = fluentLog.TaskArn[idx+1:]
|
||||
}
|
||||
|
||||
kvFields, err := FieldsFromKayvee(log)
|
||||
if err == nil {
|
||||
for k, v := range kvFields {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Inject additional fields that are useful in log-searching and other business logic; mimicking syslog behavior
|
||||
out["env"] = env
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type BadLogFormatError struct {
|
||||
Format string
|
||||
DecodingError error
|
||||
}
|
||||
|
||||
func (b BadLogFormatError) Error() string {
|
||||
return fmt.Sprintf("trying to decode log as format %s: %v", b.Format, b.DecodingError)
|
||||
}
|
||||
|
||||
func (b BadLogFormatError) Unwrap() error {
|
||||
return b.DecodingError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package decode
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
|
@ -221,10 +222,10 @@ func TestSyslogDecoding(t *testing.T) {
|
|||
fields, err := FieldsFromSyslog(spec.Input)
|
||||
if spec.ExpectedError != nil {
|
||||
assert.Error(err)
|
||||
assert.IsType(spec.ExpectedError, err)
|
||||
} else {
|
||||
assert.NoError(err)
|
||||
assert.True(errors.As(err, &spec.ExpectedError))
|
||||
return
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.Equal(spec.ExpectedOutput, fields)
|
||||
})
|
||||
}
|
||||
|
|
@ -251,6 +252,11 @@ func TestParseAndEnhance(t *testing.T) {
|
|||
}
|
||||
logTime3 = logTime3.UTC()
|
||||
|
||||
logTime4, err := time.Parse(fluentTimeFormat, "2020-08-13T21:10:57.000+0000")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
specs := []ParseAndEnhanceSpec{
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Parses a Kayvee log line from an ECS app",
|
||||
|
|
@ -307,10 +313,10 @@ func TestParseAndEnhance(t *testing.T) {
|
|||
ExpectedError: nil,
|
||||
},
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Fails to parse non-RSyslog log line",
|
||||
Title: "Fails to parse non-RSyslog, non-Fluent log line",
|
||||
Line: `not rsyslog`,
|
||||
ExpectedOutput: map[string]interface{}{},
|
||||
ExpectedError: &syslogparser.ParserError{},
|
||||
ExpectedError: fmt.Errorf(""),
|
||||
},
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Parses JSON values",
|
||||
|
|
@ -379,6 +385,161 @@ select sleep(2);`,
|
|||
"user_id": "868",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "Valid fluent log",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_app": "app",
|
||||
"container_env": "env",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing ecs_task_definition still parses and has container_task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log with bad ecs_task_definition format still parses and has container_task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing ecs_task_arn still parses and has containter_env and app",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4, // time.Date(2020, 8, 13, 21, 10, 57, 0, time.UTC),
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_env": "env",
|
||||
"container_app": "app",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing timestamp is an error",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: nil,
|
||||
ExpectedError: BadLogFormatError{},
|
||||
},
|
||||
{
|
||||
Title: "fluent log with bad timestamp format is an error",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000Z",
|
||||
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: nil,
|
||||
ExpectedError: BadLogFormatError{},
|
||||
},
|
||||
{
|
||||
Title: "Valid fluent log with overrides for container_env, app, and task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_app": "app2",
|
||||
"container_env": "env2",
|
||||
"container_task": "task",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
}
|
||||
for _, spec := range specs {
|
||||
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
|
||||
|
|
@ -386,72 +547,114 @@ select sleep(2);`,
|
|||
fields, err := ParseAndEnhance(spec.Line, "deploy-env")
|
||||
if spec.ExpectedError != nil {
|
||||
assert.Error(err)
|
||||
assert.IsType(spec.ExpectedError, err)
|
||||
} else {
|
||||
assert.NoError(err)
|
||||
assert.True(errors.As(err, &spec.ExpectedError))
|
||||
return
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.Equal(spec.ExpectedOutput, fields)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetContainerMeta(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Log("Must have a programname to get container meta")
|
||||
programname := ""
|
||||
_, err := getContainerMeta(programname, "", "", "")
|
||||
assert.Error(err)
|
||||
type containerMetaSpec struct {
|
||||
description string
|
||||
input map[string]interface{}
|
||||
wantErr bool
|
||||
wantContainerEnv string
|
||||
wantContainerApp string
|
||||
wantContainerTask string
|
||||
}
|
||||
|
||||
t.Log("Can parse a programname")
|
||||
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
||||
meta, err := getContainerMeta(programname, "", "", "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": "app2",
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
tests := []containerMetaSpec{
|
||||
{
|
||||
description: "Must have a programname to get container meta",
|
||||
input: map[string]interface{}{
|
||||
"programname": "",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
description: "Can parse a programname",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'env'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_env": "force-env",
|
||||
},
|
||||
wantContainerEnv: "force-env",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'app'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_app": "force-app",
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "force-app",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'task'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_task": "force-task",
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "force-task",
|
||||
},
|
||||
{
|
||||
description: "Can override all fields",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_task": "force-task",
|
||||
"container_app": "force-app",
|
||||
"container_env": "force-env",
|
||||
},
|
||||
wantContainerEnv: "force-env",
|
||||
wantContainerApp: "force-app",
|
||||
wantContainerTask: "force-task",
|
||||
},
|
||||
}
|
||||
|
||||
t.Log("Can override just 'env'")
|
||||
overrideEnv := "force-env"
|
||||
meta, err = getContainerMeta(programname, overrideEnv, "", "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": overrideEnv,
|
||||
"container_app": "app2",
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
for _, tcase := range tests {
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Log("Can override just 'app'")
|
||||
overrideApp := "force-app"
|
||||
meta, err = getContainerMeta(programname, "", overrideApp, "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": overrideApp,
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
syslog := map[string]interface{}{}
|
||||
for k, v := range tcase.input {
|
||||
syslog[k] = v
|
||||
}
|
||||
|
||||
t.Log("Can override just 'task'")
|
||||
overrideTask := "force-task"
|
||||
meta, err = getContainerMeta(programname, "", "", overrideTask)
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": "app2",
|
||||
"container_task": overrideTask,
|
||||
}, meta)
|
||||
newSyslog, err := addContainterMetaToSyslog(syslog)
|
||||
if tcase.wantErr {
|
||||
assert.Error(err)
|
||||
continue
|
||||
}
|
||||
for k, v := range newSyslog {
|
||||
switch k {
|
||||
case "container_env":
|
||||
assert.Equal(v, tcase.wantContainerEnv)
|
||||
case "container_app":
|
||||
assert.Equal(v, tcase.wantContainerApp)
|
||||
case "container_task":
|
||||
assert.Equal(v, tcase.wantContainerTask)
|
||||
default:
|
||||
assert.Equal(v, tcase.input[k])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("Can override all fields")
|
||||
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
||||
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": overrideEnv,
|
||||
"container_app": overrideApp,
|
||||
"container_task": overrideTask,
|
||||
}, meta)
|
||||
}
|
||||
|
||||
func TestExtractKVMeta(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue