amazon-kinesis-client-go/decode/decode.go
Taylor Sutton f5ce6fe4e7 Add decoding of JSON logs in Fluent format.
ParseAndEnhance used to be:
- Try to parse line as a syslog, extracting the log itself and other
  fields from syslog format
  - If that succeeds, try to parse the log as either a Kayvee log or
    an RDS slow query log.
  - Combine all these fields, and add on some "derived"
    fields (container_task|env|app).
- Not a syslog => error

Now it will be:
- Try to parse line as a syslog, same as before, including the
  Kayvee/RDS part
- If syslog parsing failed, try to parse as a Fluent log and extract
  some fields from the Fluent format (the log, timestamp, etc)
  - If that succeeds, try to parse the log itself as a Kayvee log.
  - Combine Kayvee fields (if found) and derived fields)
- If BOTH formats fields, it is an error.

The decoding makes a lot of assumptions:
- The names of the log field and timestamp field (even though,
theoretically, they are customizable in the fluentbit config.
- The timestamp format (again)
- The format of the Task Definition name (or at least part of it)
- All fluentbit logs should have hostname set to `aws-fargate`.

Perhaps these can be relaxed if necessary. They could probably be
replaced by some kind of config. As there is currently no config I
wanted to keep things simple as possible. If we need to re-evaluate
(for example if we start getting JSON logs that don't want to use the
same handling for container_task|env|app) we can reevaluate.
2020-08-17 15:02:59 -07:00

578 lines
16 KiB
Go

package decode
import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/Clever/syslogparser/rfc3164"
)
// reservedFields are automatically set during decoding.
// no field written by a user (e.g. contained in the Kayvee JSON) should overwrite them.
var reservedFields = []string{
"prefix",
"postfix",
"decoder_msg_type",
// used by all logs
"timestamp",
"hostname",
"rawlog",
// Used by Kayvee
"prefix",
"postfix",
// Set to the deepest step of parsing that succeeded
"decoder_msg_type",
}
func stringInSlice(s string, slice []string) bool {
for _, item := range slice {
if s == item {
return true
}
}
return false
}
// remapSyslog3164Keys renames fields to match our expecations from heka's syslog parser
// see: https://github.com/mozilla-services/heka/blob/278dd3d5961b9b6e47bb7a912b63ce3faaf8d8bd/sandbox/lua/decoders/rsyslog.lua
var remapSyslog3164Keys = map[string]string{
"hostname": "hostname",
"timestamp": "timestamp",
"tag": "programname",
"content": "rawlog",
}
// FieldsFromSyslog takes an RSyslog formatted log line and extracts fields from it
//
// Supports two log lines formats:
// - RSYSLOG_TraditionalFileFormat - the "old style" default log file format with low-precision timestamps (RFC3164)
// - RSYSLOG_FileFormat - a modern-style logfile format similar to TraditionalFileFormat, but with high-precision timestamps and timezone information
//
// For more details on Rsylog formats: https://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html
func FieldsFromSyslog(line string) (map[string]interface{}, error) {
// rfc3164 includes a severity number in front of the Syslog line, but we don't use that
fakeSeverity := "<12>"
p3164 := rfc3164.NewParser([]byte(fakeSeverity + line))
err := p3164.Parse()
if err != nil {
return map[string]interface{}{}, err
}
out := map[string]interface{}{}
for k, v := range p3164.Dump() {
if newKey, ok := remapSyslog3164Keys[k]; ok {
out[newKey] = v
}
}
out["decoder_msg_type"] = "syslog"
return out, nil
}
// NonKayveeError occurs when the log line is not Kayvee
type NonKayveeError struct{}
func (e NonKayveeError) Error() string {
return fmt.Sprint("Log line is not Kayvee (doesn't have JSON payload)")
}
// FieldsFromKayvee takes a log line and extracts fields from the Kayvee (JSON) part
func FieldsFromKayvee(line string) (map[string]interface{}, error) {
m := map[string]interface{}{}
firstIdx := strings.Index(line, "{")
lastIdx := strings.LastIndex(line, "}")
if firstIdx == -1 || lastIdx == -1 || firstIdx > lastIdx {
return map[string]interface{}{}, &NonKayveeError{}
}
m["prefix"] = line[:firstIdx]
m["postfix"] = line[lastIdx+1:]
possibleJSON := line[firstIdx : lastIdx+1]
var fields map[string]interface{}
if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil {
return map[string]interface{}{}, &NonKayveeError{}
}
if len(fields) == 0 { // Some logs superfluous "{}" in them. They're not kayvee.
return map[string]interface{}{}, &NonKayveeError{}
}
// TODO: consider also filter if they have source and title
for k, v := range fields {
if !stringInSlice(k, reservedFields) {
m[k] = v
}
}
m["decoder_msg_type"] = "Kayvee"
return m, nil
}
var userPattern = `#\sUser@Host:\s(?P<user>[a-zA-Z]+\[[a-zA-Z]+\])\s@\s.*Id:\s+(?P<id>[0-9]+)`
var userPatternRegex = regexp.MustCompile(userPattern)
func FieldsFromRDSSlowquery(rawlog string) map[string]interface{} {
out := map[string]interface{}{}
match := userPatternRegex.FindStringSubmatch(rawlog)
if len(match) == 3 {
out["user"] = match[1]
out["user_id"] = match[2]
}
return out
}
// MetricsRoute represents a metrics kv log route
type MetricsRoute struct {
Series string
Dimensions []string
ValueField string
RuleName string
}
// AnalyticsRoute represents an analytics kv log route
type AnalyticsRoute struct {
Series string
RuleName string
}
// NotificationRoute represents a notification kv log route
type NotificationRoute struct {
Channel string
Icon string
Message string
User string
RuleName string
}
// AlertRoute represents an alert kv log route
type AlertRoute struct {
Series string
Dimensions []string
StatType string
ValueField string
RuleName string
}
func getStringValue(json map[string]interface{}, key string) string {
val, ok := json[key]
if !ok {
return ""
}
str, ok := val.(string)
if !ok {
return ""
}
return str
}
func getStringArray(json map[string]interface{}, key string) []string {
val, ok := json[key]
if !ok {
return []string{}
}
iArray, ok := val.([]interface{})
if !ok {
return []string{}
}
strArray := []string{}
for _, item := range iArray {
s, ok := item.(string)
if !ok {
return []string{}
}
strArray = append(strArray, s)
}
return strArray
}
// LogRoutes a type alias to make it easier to add route specific filter functions
type LogRoutes []map[string]interface{}
// RuleNames returns a list of the names of the rules
func (l LogRoutes) RuleNames() []string {
names := []string{}
for _, route := range l {
name := getStringValue(route, "rule")
names = append(names, name)
}
return names
}
// MetricsRoutes filters the LogRoutes and returns a list of MetricsRoutes structs
func (l LogRoutes) MetricsRoutes() []MetricsRoute {
routes := []MetricsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "metrics" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, MetricsRoute{
Series: series,
Dimensions: dimensions,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// AnalyticsRoutes filters the LogRoutes and returns a list of AnalyticsRoutes structs
func (l LogRoutes) AnalyticsRoutes() []AnalyticsRoute {
routes := []AnalyticsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "analytics" {
continue
}
series := getStringValue(route, "series")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
routes = append(routes, AnalyticsRoute{
Series: series,
RuleName: ruleName,
})
}
return routes
}
// NotificationRoutes filters the LogRoutes and returns a list of NotificationRoutes structs
func (l LogRoutes) NotificationRoutes() []NotificationRoute {
routes := []NotificationRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "notifications" {
continue
}
channel := getStringValue(route, "channel")
icon := getStringValue(route, "icon")
message := getStringValue(route, "message")
user := getStringValue(route, "user")
rule := getStringValue(route, "rule")
if channel == "" || message == "" { // TODO: log error
continue
}
if icon == "" {
icon = ":ghost:"
}
if user == "" {
user = "logging-pipeline"
}
routes = append(routes, NotificationRoute{
Channel: channel,
Icon: icon,
Message: message,
User: user,
RuleName: rule,
})
}
return routes
}
// AlertRoutes filters the LogRoutes and returns a list of AlertRoutes structs
func (l LogRoutes) AlertRoutes() []AlertRoute {
routes := []AlertRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "alerts" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
statType := getStringValue(route, "stat_type")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if statType == "" {
statType = "counter"
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, AlertRoute{
Series: series,
Dimensions: dimensions,
StatType: statType,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// KVMeta a struct that represents kv-meta data
type KVMeta struct {
Team string
Version string
Language string
Routes LogRoutes
}
// ExtractKVMeta returns a struct with available kv-meta data
func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
tmp, ok := kvlog["_kvmeta"]
if !ok {
return KVMeta{}
}
kvmeta, ok := tmp.(map[string]interface{})
if !ok {
return KVMeta{}
}
kvRoutes := []map[string]interface{}{}
tmp, ok = kvmeta["routes"]
if ok {
routes, ok := tmp.([]interface{})
if ok {
for _, route := range routes {
rule, ok := route.(map[string]interface{})
if ok { // TODO: log error
kvRoutes = append(kvRoutes, rule)
}
}
}
}
return KVMeta{
Team: getStringValue(kvmeta, "team"),
Version: getStringValue(kvmeta, "kv_version"),
Language: getStringValue(kvmeta, "kv_language"),
Routes: kvRoutes,
}
}
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
syslog, syslogErr := FieldsFromSyslog(line)
if syslogErr == nil {
return decodeSyslog(syslog, env)
}
fluentLog, fluentErr := FieldsFromFluentbitLog(line)
if fluentErr == nil {
return decodeFluent(fluentLog, env)
}
return nil, fmt.Errorf("unable to parse log line with errors `%v` and `%v`", syslogErr, fluentErr)
}
func decodeSyslog(syslog map[string]interface{}, env string) (map[string]interface{}, error) {
rawlog := syslog["rawlog"].(string)
// Try pulling Kayvee fields out of message
kvFields, err := FieldsFromKayvee(rawlog)
if err != nil {
if _, ok := err.(*NonKayveeError); !ok {
return map[string]interface{}{}, err
}
} else {
for k, v := range kvFields {
syslog[k] = v
}
}
// Try pulling RDS slowquery logs fields out of message
if syslog["hostname"] == "aws-rds" {
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
for k, v := range slowQueryFields {
syslog[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic
syslog["env"] = env
// this can error, which indicates inability to extract container meta. That's fine, we can ignore that.
addContainterMetaToSyslog(syslog)
return syslog, nil
}
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID
var containerMetaRegex = regexp.MustCompile(containerMeta)
var errBadProgramname = errors.New("invalid or missing programname")
func addContainterMetaToSyslog(syslog map[string]interface{}) (map[string]interface{}, error) {
programname, ok := syslog["programname"].(string)
if !ok || programname == "" {
return syslog, errBadProgramname
}
env := ""
app := ""
task := ""
matches := containerMetaRegex.FindAllStringSubmatch(programname, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][4]
} else {
return syslog, errBadProgramname
}
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
// A separate container monitors for start/stop events, but we set the container values in such a way that
// the logs for these events will appear in context for the app that the user is looking at instead of the
// docker-events app.
if existingEnv, ok := syslog["container_env"].(string); !ok || existingEnv == "" {
syslog["container_env"] = env
}
if existingApp, ok := syslog["container_app"].(string); !ok || existingApp == "" {
syslog["container_app"] = app
}
if existingTask, ok := syslog["container_task"].(string); !ok || existingTask == "" {
syslog["container_task"] = task
}
return syslog, nil
}
const (
// These are the fields in the incoming fluentbit JSON that we expect the timestamp and log
// They are referenced below by the FluentLog type; those are the ones that matter.
fluentbitTimestampField = "fluent_ts"
fluentbitLogField = "log"
// This is what we get from the fluentbig config: `time_key_format: "%Y-%m-%dT%H:%M:%S.%L%z"`
fluentTimeFormat = "2006-01-02T15:04:05.999-0700"
)
// FluentLog represents is the set of fields extracted from an incoming fluentbit log
// The struct tags must line up with the JSON schema in the fluentbit configuration, see the comment for FieldsFromFluentBitlog
type FluentLog struct {
Log *string `json:"log"`
Timestamp *string `json:"fluent_ts"`
TaskArn string `json:"ecs_task_arn"`
TaskDefinition string `json:"ecs_task_definition"`
}
// FieldsFromFluentbitLog parses JSON object with fields indicating that it's coming from FluentBit
// Its return value shares a common interface with the Syslog output - with the same four key fields
// Unlike FieldsFromSyslog, it accepts its argument as bytes, since it will be used as bytes immediately and bytes is what comes off the firehose
// In theory, the format we recieve is highly customizable, so we'll be making the following assumptions:
// - All logs are coming from aws-fargate with the ecs-metadata fields (ecs_cluster, ecs_task_arn, ecs_task_definition) enabled
// - The timestamp is in the field given by the FluentBitTimestampField constant in this package
// - The timestamp is of the format of the constant fluentTimestampFormat
// - The log is in the field given by the FluentBitlogField constant in this package
// - The ecs_task_definition is of the from {environment}--{application}--.*
func FieldsFromFluentbitLog(line string) (*FluentLog, error) {
fluentFields := FluentLog{}
if err := json.Unmarshal([]byte(line), &fluentFields); err != nil {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: err}
}
return &fluentFields, nil
}
func decodeFluent(fluentLog *FluentLog, env string) (map[string]interface{}, error) {
out := map[string]interface{}{}
if fluentLog.Timestamp == nil || *fluentLog.Timestamp == "" {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no timestamp found in input field %s", fluentbitTimestampField)}
}
if timeValue, err := time.Parse(fluentTimeFormat, *fluentLog.Timestamp); err == nil {
out["timestamp"] = timeValue
} else {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("timestamp has bad format: %s", *fluentLog.Timestamp)}
}
if fluentLog.Log == nil {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no log found in input field %s", fluentbitLogField)}
}
log := *fluentLog.Log
out["rawlog"] = log
out["decoder_msg_type"] = "fluentbit"
out["hostname"] = "aws-fargate"
// best effort to add container_env|app|task
if parts := strings.SplitN(fluentLog.TaskDefinition, "--", 3); len(parts) == 3 {
out["container_env"] = parts[0]
out["container_app"] = parts[1]
}
if idx := strings.LastIndex(fluentLog.TaskArn, "/"); idx != -1 {
out["container_task"] = fluentLog.TaskArn[idx+1:]
}
kvFields, err := FieldsFromKayvee(log)
if err == nil {
for k, v := range kvFields {
out[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic; mimicking syslog behavior
out["env"] = env
return out, nil
}
type BadLogFormatError struct {
Format string
DecodingError error
}
func (b BadLogFormatError) Error() string {
return fmt.Sprintf("trying to decode log as format %s: %v", b.Format, b.DecodingError)
}
func (b BadLogFormatError) Unwrap() error {
return b.DecodingError
}