diff --git a/splitter/splitter.go b/splitter/splitter.go index ed50863..88f4a12 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -7,14 +7,46 @@ import ( "fmt" "io/ioutil" "regexp" + "strconv" "time" ) // LogEvent is a single log line within a LogEventBatch type LogEvent struct { - ID string `json:"id"` - Timestamp int64 `json:"timestamp"` - Message string `json:"message"` + ID string `json:"id"` + Timestamp UnixTimestampMillis `json:"timestamp"` + Message string `json:"message"` +} + +// UnixTimestampMillis is a time.Time that marshals (unmarshals) to (from) a unix timestamp with millisecond resolution. +type UnixTimestampMillis time.Time + +func NewUnixTimestampMillis(ts int64) UnixTimestampMillis { + return UnixTimestampMillis(time.Unix(ts/millisPerSecond, + (ts%millisPerSecond)*nanosPerMillisecond)) +} + +func (t *UnixTimestampMillis) MarshalJSON() ([]byte, error) { + ts := time.Time(*t).UnixNano() + stamp := fmt.Sprint(ts / nanosPerMillisecond) + + return []byte(stamp), nil +} + +var millisPerSecond = int64(time.Second / time.Millisecond) +var nanosPerMillisecond = int64(time.Millisecond / time.Nanosecond) + +func (t *UnixTimestampMillis) UnmarshalJSON(b []byte) error { + ts, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return err + } + *t = NewUnixTimestampMillis(ts) + return nil +} + +func (t *UnixTimestampMillis) Time() time.Time { + return time.Time(*t) } // LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription @@ -67,42 +99,112 @@ const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00" // http://docs.aws.amazon.com/batch/latest/userguide/job_states.html // "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)." -const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app +const awsBatchTaskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app `default\/` + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id +var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta) -var taskRegex = regexp.MustCompile(taskMeta) +// lambda log groups are of the form /aws/lambda/-- +var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`) +var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`) + +// arn cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 +const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + +type RSysLogMessage struct { + Timestamp time.Time + Hostname string + ProgramName string + PID int + Message string +} + +func (r RSysLogMessage) String() string { + // Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format. + // Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472) + return fmt.Sprintf(`%s %s %s[%d]: %s`, + r.Timestamp.Add(time.Microsecond).Format(RFC3339Micro), + r.Hostname, r.ProgramName, r.PID, r.Message) +} + +func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + task := matches[0][3] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + task, + PID: 1, + Hostname: "aws-batch", + Message: event.Message, + }) + } + return out, true +} + +func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + // find the request ID, e.g. 1f7fcc25-015f-11e8-a728-a1b6168ab9aa, set it as task + var task string + if matches := awsLambdaRequestIDRegex.FindAllString(event.Message, 1); len(matches) == 1 { + task = matches[0] + } + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + task, + PID: 1, + Hostname: "aws-lambda", + Message: event.Message, + }) + } + return out, true +} + +func splitDefault(b LogEventBatch) []RSysLogMessage { + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + Hostname: "unknown", + ProgramName: "unknown", + PID: 1, + Message: event.Message, + }) + } + return out +} // Split takes a LogEventBatch and separates into a slice of enriched log lines // Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by // the subsequent decoding logic. func Split(b LogEventBatch) [][]byte { - env := "unknown" - app := "unknown" - task := "00001111-2222-3333-4444-555566667777" - matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1) - if len(matches) == 1 { - env = matches[0][1] - app = matches[0][2] - task = matches[0][3] + var rsyslogMsgs []RSysLogMessage + var ok bool + if rsyslogMsgs, ok = splitAWSLambda(b); !ok { + rsyslogMsgs, ok = splitAWSBatch(b) + if !ok { + rsyslogMsgs = splitDefault(b) + } } - rsyslogPrefix := `%s %s %s[%d]: %s` - // programName is a mocked ARN in the format expected by our log decoders - programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task - mockPid := 1 - hostname := "aws-batch" - out := [][]byte{} - for _, event := range b.LogEvents { - // Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format. - // Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472) - nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond) - logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro) - - // Fake an RSyslog prefix, expected by consumers - formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message) - out = append(out, []byte(formatted)) + for _, rsyslogMsg := range rsyslogMsgs { + out = append(out, []byte(rsyslogMsg.String())) } return out diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 20d23c5..4f9e512 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -28,7 +28,7 @@ func TestUnpacking(t *testing.T) { LogEvents: []LogEvent{ { ID: "", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.", }, }, @@ -98,7 +98,7 @@ func TestFullLoop(t *testing.T) { assert.Equal(t, leb, output) } -func TestSplit(t *testing.T) { +func TestSplitBatch(t *testing.T) { input := LogEventBatch{ MessageType: "DATA_MESSAGE", Owner: "123456789012", @@ -108,12 +108,12 @@ func TestSplit(t *testing.T) { LogEvents: []LogEvent{ { ID: "99999992379011144044923130086453437181614530551221780480", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "some log line", }, { ID: "99999992387663833181953011865369295871402094815542181889", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "another log line", }, }, @@ -125,3 +125,31 @@ func TestSplit(t *testing.T) { } assert.Equal(t, expected, lines) } + +func TestSplitLambda(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/lambda/env--app", + LogStream: "2018/01/24/[3]62695bfa96de46938f56b156f5235205", + SubscriptionFilters: []string{"ForwardLogsToKinesis"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3", + }, + { + ID: "99999992387663833181953011865369295871402094815542181889", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: `{"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`, + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3`), + []byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: {"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`), + } + assert.Equal(t, expected, lines) +}