Merge pull request #23 from Clever/INFRA-2764

handle logs coming in from lambda functions
This commit is contained in:
Rafael 2018-01-25 15:15:10 -08:00 committed by GitHub
commit bfe2ac0f6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 210 additions and 37 deletions

View file

@ -58,7 +58,7 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records:
// - records emitted from CWLogs Subscription
// - records emiited from KPL
// - records emitted from KPL
if !splitter.IsGzipped(record) {
// Process a single message, from KPL
return [][]byte{record}, nil

View file

@ -1,7 +1,7 @@
# This is the default Clever Golang Makefile.
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
# Please do not alter this file directly.
GOLANG_MK_VERSION := 0.2.0
GOLANG_MK_VERSION := 0.3.2
SHELL := /bin/bash
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
@ -18,7 +18,7 @@ define golang-version-check
_ := $(if \
$(shell \
expr >/dev/null \
`go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \
`go version | cut -d" " -f3 | cut -c3- | cut -d. -f2 | sed -E 's/beta[0-9]+//'` \
\>= `echo $(1) | cut -d. -f2` \
\& \
`go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \
@ -52,9 +52,16 @@ golang-dep-vendor-deps: bin/dep
# golang-godep-vendor is a target for saving dependencies with the dep tool
# to the vendor/ directory. All nested vendor/ directories are deleted via
# the prune command.
# In CI, -vendor-only is used to avoid updating the lock file.
ifndef CI
define golang-dep-vendor
bin/dep ensure
bin/dep ensure -v
endef
else
define golang-dep-vendor
bin/dep ensure -v -vendor-only
endef
endif
# Golint is a tool for linting Golang code for common errors.
GOLINT := $(GOPATH)/bin/golint
@ -147,6 +154,19 @@ $(call golang-vet,$(1))
$(call golang-test-strict,$(1))
endef
# golang-build: builds a golang binary. ensures CGO build is done during CI. This is needed to make a binary that works with a Docker alpine image.
# arg1: pkg path
# arg2: executable name
define golang-build
@echo "BUILDING..."
@if [ -z "$$CI" ]; then \
go build -o bin/$(2) $(1); \
else \
echo "-> Building CGO binary"; \
CGO_ENABLED=0 go build -installsuffix cgo -o bin/$(2) $(1); \
fi;
endef
# golang-update-makefile downloads latest version of golang.mk
golang-update-makefile:
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null

View file

@ -7,16 +7,48 @@ import (
"fmt"
"io/ioutil"
"regexp"
"strconv"
"time"
)
// LogEvent is a single log line within a LogEventBatch
type LogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Timestamp UnixTimestampMillis `json:"timestamp"`
Message string `json:"message"`
}
// UnixTimestampMillis is a time.Time that marshals (unmarshals) to (from) a unix timestamp with millisecond resolution.
type UnixTimestampMillis time.Time
func NewUnixTimestampMillis(ts int64) UnixTimestampMillis {
return UnixTimestampMillis(time.Unix(ts/millisPerSecond,
(ts%millisPerSecond)*nanosPerMillisecond))
}
func (t *UnixTimestampMillis) MarshalJSON() ([]byte, error) {
ts := time.Time(*t).UnixNano()
stamp := fmt.Sprint(ts / nanosPerMillisecond)
return []byte(stamp), nil
}
var millisPerSecond = int64(time.Second / time.Millisecond)
var nanosPerMillisecond = int64(time.Millisecond / time.Nanosecond)
func (t *UnixTimestampMillis) UnmarshalJSON(b []byte) error {
ts, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return err
}
*t = NewUnixTimestampMillis(ts)
return nil
}
func (t *UnixTimestampMillis) Time() time.Time {
return time.Time(*t)
}
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
type LogEventBatch struct {
MessageType string `json:"messageType"`
@ -67,42 +99,113 @@ const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
// http://docs.aws.amazon.com/batch/latest/userguide/job_states.html
// "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)."
const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
const awsBatchTaskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`default\/` +
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta)
var taskRegex = regexp.MustCompile(taskMeta)
// lambda log groups are of the form /aws/lambda/<env>--<app>
var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`)
var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`)
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
const taskCruft = `12345678-1234-1234-1234-555566667777`
type RSysLogMessage struct {
Timestamp time.Time
Hostname string
ProgramName string
PID int
Message string
}
func (r RSysLogMessage) String() string {
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
return fmt.Sprintf(`%s %s %s[%d]: %s`,
r.Timestamp.Add(time.Microsecond).Format(RFC3339Micro),
r.Hostname, r.ProgramName, r.PID, r.Message)
}
func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) != 1 {
return nil, false
}
env := matches[0][1]
app := matches[0][2]
task := matches[0][3]
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
out = append(out, RSysLogMessage{
Timestamp: event.Timestamp.Time(),
ProgramName: env + "--" + app + arnCruft + task,
PID: 1,
Hostname: "aws-batch",
Message: event.Message,
})
}
return out, true
}
func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 {
return nil, false
}
env := matches[0][1]
app := matches[0][2]
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
// find the request ID, e.g. 1f7fcc25-015f-11e8-a728-a1b6168ab9aa, set it as task
var task string
if matches := awsLambdaRequestIDRegex.FindAllString(event.Message, 1); len(matches) == 1 {
task = matches[0]
}
out = append(out, RSysLogMessage{
Timestamp: event.Timestamp.Time(),
ProgramName: env + "--" + app + arnCruft + task,
PID: 1,
Hostname: "aws-lambda",
Message: event.Message,
})
}
return out, true
}
func splitDefault(b LogEventBatch) []RSysLogMessage {
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
out = append(out, RSysLogMessage{
Timestamp: event.Timestamp.Time(),
Hostname: b.LogStream,
ProgramName: b.LogGroup + "--" + b.LogStream + arnCruft + taskCruft,
PID: 1,
Message: event.Message,
})
}
return out
}
// Split takes a LogEventBatch and separates into a slice of enriched log lines
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
// the subsequent decoding logic.
func Split(b LogEventBatch) [][]byte {
env := "unknown"
app := "unknown"
task := "00001111-2222-3333-4444-555566667777"
matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][3]
var rsyslogMsgs []RSysLogMessage
var ok bool
if rsyslogMsgs, ok = splitAWSLambda(b); !ok {
rsyslogMsgs, ok = splitAWSBatch(b)
if !ok {
rsyslogMsgs = splitDefault(b)
}
}
rsyslogPrefix := `%s %s %s[%d]: %s`
// programName is a mocked ARN in the format expected by our log decoders
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
mockPid := 1
hostname := "aws-batch"
out := [][]byte{}
for _, event := range b.LogEvents {
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond)
logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro)
// Fake an RSyslog prefix, expected by consumers
formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message)
out = append(out, []byte(formatted))
for _, rsyslogMsg := range rsyslogMsgs {
out = append(out, []byte(rsyslogMsg.String()))
}
return out

View file

@ -28,7 +28,7 @@ func TestUnpacking(t *testing.T) {
LogEvents: []LogEvent{
{
ID: "",
Timestamp: 1498519943285,
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
},
},
@ -98,7 +98,7 @@ func TestFullLoop(t *testing.T) {
assert.Equal(t, leb, output)
}
func TestSplit(t *testing.T) {
func TestSplitBatch(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
@ -108,12 +108,12 @@ func TestSplit(t *testing.T) {
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "another log line",
},
},
@ -125,3 +125,53 @@ func TestSplit(t *testing.T) {
}
assert.Equal(t, expected, lines)
}
func TestSplitLambda(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/lambda/env--app",
LogStream: "2018/01/24/[3]62695bfa96de46938f56b156f5235205",
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: `{"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`,
},
},
}
lines := Split(input)
expected := [][]byte{
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3`),
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: {"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`),
}
assert.Equal(t, expected, lines)
}
func TestSplitDefault(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "vpn_flow_logs",
LogStream: "eni-43403819-all",
SubscriptionFilters: []string{"SomeSubscription"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "2 589690932525 eni-43403819 10.0.0.233 172.217.6.46 64067 443 17 8 3969 1516891809 1516891868 ACCEPT OK",
},
},
}
lines := Split(input)
expected := [][]byte{
[]byte(`2017-06-26T23:32:23.285001+00:00 eni-43403819-all vpn_flow_logs--eni-43403819-all/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: 2 589690932525 eni-43403819 10.0.0.233 172.217.6.46 64067 443 17 8 3969 1516891809 1516891868 ACCEPT OK`),
}
assert.Equal(t, expected, lines)
}