handle logs coming in from lambda functions
This commit is contained in:
parent
7d863db172
commit
10750e802b
2 changed files with 162 additions and 32 deletions
|
|
@ -7,14 +7,46 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LogEvent is a single log line within a LogEventBatch
|
// LogEvent is a single log line within a LogEventBatch
|
||||||
type LogEvent struct {
|
type LogEvent struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
Timestamp int64 `json:"timestamp"`
|
Timestamp UnixTimestampMillis `json:"timestamp"`
|
||||||
Message string `json:"message"`
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnixTimestampMillis is a time.Time that marshals (unmarshals) to (from) a unix timestamp with millisecond resolution.
|
||||||
|
type UnixTimestampMillis time.Time
|
||||||
|
|
||||||
|
func NewUnixTimestampMillis(ts int64) UnixTimestampMillis {
|
||||||
|
return UnixTimestampMillis(time.Unix(ts/millisPerSecond,
|
||||||
|
(ts%millisPerSecond)*nanosPerMillisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *UnixTimestampMillis) MarshalJSON() ([]byte, error) {
|
||||||
|
ts := time.Time(*t).UnixNano()
|
||||||
|
stamp := fmt.Sprint(ts / nanosPerMillisecond)
|
||||||
|
|
||||||
|
return []byte(stamp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var millisPerSecond = int64(time.Second / time.Millisecond)
|
||||||
|
var nanosPerMillisecond = int64(time.Millisecond / time.Nanosecond)
|
||||||
|
|
||||||
|
func (t *UnixTimestampMillis) UnmarshalJSON(b []byte) error {
|
||||||
|
ts, err := strconv.ParseInt(string(b), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*t = NewUnixTimestampMillis(ts)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *UnixTimestampMillis) Time() time.Time {
|
||||||
|
return time.Time(*t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
|
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
|
||||||
|
|
@ -67,42 +99,112 @@ const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
|
||||||
|
|
||||||
// http://docs.aws.amazon.com/batch/latest/userguide/job_states.html
|
// http://docs.aws.amazon.com/batch/latest/userguide/job_states.html
|
||||||
// "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)."
|
// "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)."
|
||||||
const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
const awsBatchTaskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
||||||
`default\/` +
|
`default\/` +
|
||||||
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
|
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
|
||||||
|
var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta)
|
||||||
|
|
||||||
var taskRegex = regexp.MustCompile(taskMeta)
|
// lambda log groups are of the form /aws/lambda/<env>--<app>
|
||||||
|
var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`)
|
||||||
|
var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`)
|
||||||
|
|
||||||
|
// arn cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
||||||
|
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
||||||
|
|
||||||
|
type RSysLogMessage struct {
|
||||||
|
Timestamp time.Time
|
||||||
|
Hostname string
|
||||||
|
ProgramName string
|
||||||
|
PID int
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r RSysLogMessage) String() string {
|
||||||
|
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
|
||||||
|
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
|
||||||
|
return fmt.Sprintf(`%s %s %s[%d]: %s`,
|
||||||
|
r.Timestamp.Add(time.Microsecond).Format(RFC3339Micro),
|
||||||
|
r.Hostname, r.ProgramName, r.PID, r.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
|
matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1)
|
||||||
|
if len(matches) != 1 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
env := matches[0][1]
|
||||||
|
app := matches[0][2]
|
||||||
|
task := matches[0][3]
|
||||||
|
|
||||||
|
out := []RSysLogMessage{}
|
||||||
|
for _, event := range b.LogEvents {
|
||||||
|
out = append(out, RSysLogMessage{
|
||||||
|
Timestamp: event.Timestamp.Time(),
|
||||||
|
ProgramName: env + "--" + app + arnCruft + task,
|
||||||
|
PID: 1,
|
||||||
|
Hostname: "aws-batch",
|
||||||
|
Message: event.Message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
|
matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||||
|
if len(matches) != 1 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
env := matches[0][1]
|
||||||
|
app := matches[0][2]
|
||||||
|
|
||||||
|
out := []RSysLogMessage{}
|
||||||
|
for _, event := range b.LogEvents {
|
||||||
|
// find the request ID, e.g. 1f7fcc25-015f-11e8-a728-a1b6168ab9aa, set it as task
|
||||||
|
var task string
|
||||||
|
if matches := awsLambdaRequestIDRegex.FindAllString(event.Message, 1); len(matches) == 1 {
|
||||||
|
task = matches[0]
|
||||||
|
}
|
||||||
|
out = append(out, RSysLogMessage{
|
||||||
|
Timestamp: event.Timestamp.Time(),
|
||||||
|
ProgramName: env + "--" + app + arnCruft + task,
|
||||||
|
PID: 1,
|
||||||
|
Hostname: "aws-lambda",
|
||||||
|
Message: event.Message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||||
|
out := []RSysLogMessage{}
|
||||||
|
for _, event := range b.LogEvents {
|
||||||
|
out = append(out, RSysLogMessage{
|
||||||
|
Timestamp: event.Timestamp.Time(),
|
||||||
|
Hostname: "unknown",
|
||||||
|
ProgramName: "unknown",
|
||||||
|
PID: 1,
|
||||||
|
Message: event.Message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// Split takes a LogEventBatch and separates into a slice of enriched log lines
|
// Split takes a LogEventBatch and separates into a slice of enriched log lines
|
||||||
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
|
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
|
||||||
// the subsequent decoding logic.
|
// the subsequent decoding logic.
|
||||||
func Split(b LogEventBatch) [][]byte {
|
func Split(b LogEventBatch) [][]byte {
|
||||||
env := "unknown"
|
var rsyslogMsgs []RSysLogMessage
|
||||||
app := "unknown"
|
var ok bool
|
||||||
task := "00001111-2222-3333-4444-555566667777"
|
if rsyslogMsgs, ok = splitAWSLambda(b); !ok {
|
||||||
matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1)
|
rsyslogMsgs, ok = splitAWSBatch(b)
|
||||||
if len(matches) == 1 {
|
if !ok {
|
||||||
env = matches[0][1]
|
rsyslogMsgs = splitDefault(b)
|
||||||
app = matches[0][2]
|
}
|
||||||
task = matches[0][3]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rsyslogPrefix := `%s %s %s[%d]: %s`
|
|
||||||
// programName is a mocked ARN in the format expected by our log decoders
|
|
||||||
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
|
|
||||||
mockPid := 1
|
|
||||||
hostname := "aws-batch"
|
|
||||||
|
|
||||||
out := [][]byte{}
|
out := [][]byte{}
|
||||||
for _, event := range b.LogEvents {
|
for _, rsyslogMsg := range rsyslogMsgs {
|
||||||
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
|
out = append(out, []byte(rsyslogMsg.String()))
|
||||||
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
|
|
||||||
nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond)
|
|
||||||
logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro)
|
|
||||||
|
|
||||||
// Fake an RSyslog prefix, expected by consumers
|
|
||||||
formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message)
|
|
||||||
out = append(out, []byte(formatted))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return out
|
return out
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ func TestUnpacking(t *testing.T) {
|
||||||
LogEvents: []LogEvent{
|
LogEvents: []LogEvent{
|
||||||
{
|
{
|
||||||
ID: "",
|
ID: "",
|
||||||
Timestamp: 1498519943285,
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
|
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -98,7 +98,7 @@ func TestFullLoop(t *testing.T) {
|
||||||
assert.Equal(t, leb, output)
|
assert.Equal(t, leb, output)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSplit(t *testing.T) {
|
func TestSplitBatch(t *testing.T) {
|
||||||
input := LogEventBatch{
|
input := LogEventBatch{
|
||||||
MessageType: "DATA_MESSAGE",
|
MessageType: "DATA_MESSAGE",
|
||||||
Owner: "123456789012",
|
Owner: "123456789012",
|
||||||
|
|
@ -108,12 +108,12 @@ func TestSplit(t *testing.T) {
|
||||||
LogEvents: []LogEvent{
|
LogEvents: []LogEvent{
|
||||||
{
|
{
|
||||||
ID: "99999992379011144044923130086453437181614530551221780480",
|
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||||
Timestamp: 1498519943285,
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
Message: "some log line",
|
Message: "some log line",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "99999992387663833181953011865369295871402094815542181889",
|
ID: "99999992387663833181953011865369295871402094815542181889",
|
||||||
Timestamp: 1498519943285,
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
Message: "another log line",
|
Message: "another log line",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -125,3 +125,31 @@ func TestSplit(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, lines)
|
assert.Equal(t, expected, lines)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitLambda(t *testing.T) {
|
||||||
|
input := LogEventBatch{
|
||||||
|
MessageType: "DATA_MESSAGE",
|
||||||
|
Owner: "123456789012",
|
||||||
|
LogGroup: "/aws/lambda/env--app",
|
||||||
|
LogStream: "2018/01/24/[3]62695bfa96de46938f56b156f5235205",
|
||||||
|
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
|
||||||
|
LogEvents: []LogEvent{
|
||||||
|
{
|
||||||
|
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||||
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
|
Message: "START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "99999992387663833181953011865369295871402094815542181889",
|
||||||
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
|
Message: `{"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
lines := Split(input)
|
||||||
|
expected := [][]byte{
|
||||||
|
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3`),
|
||||||
|
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: {"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expected, lines)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue