From 10750e802b141b0ef6f568481acd7b030b98841f Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Thu, 25 Jan 2018 01:23:03 +0000 Subject: [PATCH 1/2] handle logs coming in from lambda functions --- splitter/splitter.go | 158 +++++++++++++++++++++++++++++++------- splitter/splitter_test.go | 36 ++++++++- 2 files changed, 162 insertions(+), 32 deletions(-) diff --git a/splitter/splitter.go b/splitter/splitter.go index ed50863..88f4a12 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -7,14 +7,46 @@ import ( "fmt" "io/ioutil" "regexp" + "strconv" "time" ) // LogEvent is a single log line within a LogEventBatch type LogEvent struct { - ID string `json:"id"` - Timestamp int64 `json:"timestamp"` - Message string `json:"message"` + ID string `json:"id"` + Timestamp UnixTimestampMillis `json:"timestamp"` + Message string `json:"message"` +} + +// UnixTimestampMillis is a time.Time that marshals (unmarshals) to (from) a unix timestamp with millisecond resolution. +type UnixTimestampMillis time.Time + +func NewUnixTimestampMillis(ts int64) UnixTimestampMillis { + return UnixTimestampMillis(time.Unix(ts/millisPerSecond, + (ts%millisPerSecond)*nanosPerMillisecond)) +} + +func (t *UnixTimestampMillis) MarshalJSON() ([]byte, error) { + ts := time.Time(*t).UnixNano() + stamp := fmt.Sprint(ts / nanosPerMillisecond) + + return []byte(stamp), nil +} + +var millisPerSecond = int64(time.Second / time.Millisecond) +var nanosPerMillisecond = int64(time.Millisecond / time.Nanosecond) + +func (t *UnixTimestampMillis) UnmarshalJSON(b []byte) error { + ts, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return err + } + *t = NewUnixTimestampMillis(ts) + return nil +} + +func (t *UnixTimestampMillis) Time() time.Time { + return time.Time(*t) } // LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription @@ -67,42 +99,112 @@ const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00" // http://docs.aws.amazon.com/batch/latest/userguide/job_states.html // "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)." -const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app +const awsBatchTaskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app `default\/` + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id +var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta) -var taskRegex = regexp.MustCompile(taskMeta) +// lambda log groups are of the form /aws/lambda/-- +var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`) +var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`) + +// arn cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 +const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + +type RSysLogMessage struct { + Timestamp time.Time + Hostname string + ProgramName string + PID int + Message string +} + +func (r RSysLogMessage) String() string { + // Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format. + // Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472) + return fmt.Sprintf(`%s %s %s[%d]: %s`, + r.Timestamp.Add(time.Microsecond).Format(RFC3339Micro), + r.Hostname, r.ProgramName, r.PID, r.Message) +} + +func splitAWSBatch(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsBatchTaskRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + task := matches[0][3] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + task, + PID: 1, + Hostname: "aws-batch", + Message: event.Message, + }) + } + return out, true +} + +func splitAWSLambda(b LogEventBatch) ([]RSysLogMessage, bool) { + matches := awsLambdaLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1) + if len(matches) != 1 { + return nil, false + } + env := matches[0][1] + app := matches[0][2] + + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + // find the request ID, e.g. 1f7fcc25-015f-11e8-a728-a1b6168ab9aa, set it as task + var task string + if matches := awsLambdaRequestIDRegex.FindAllString(event.Message, 1); len(matches) == 1 { + task = matches[0] + } + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + ProgramName: env + "--" + app + arnCruft + task, + PID: 1, + Hostname: "aws-lambda", + Message: event.Message, + }) + } + return out, true +} + +func splitDefault(b LogEventBatch) []RSysLogMessage { + out := []RSysLogMessage{} + for _, event := range b.LogEvents { + out = append(out, RSysLogMessage{ + Timestamp: event.Timestamp.Time(), + Hostname: "unknown", + ProgramName: "unknown", + PID: 1, + Message: event.Message, + }) + } + return out +} // Split takes a LogEventBatch and separates into a slice of enriched log lines // Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by // the subsequent decoding logic. func Split(b LogEventBatch) [][]byte { - env := "unknown" - app := "unknown" - task := "00001111-2222-3333-4444-555566667777" - matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1) - if len(matches) == 1 { - env = matches[0][1] - app = matches[0][2] - task = matches[0][3] + var rsyslogMsgs []RSysLogMessage + var ok bool + if rsyslogMsgs, ok = splitAWSLambda(b); !ok { + rsyslogMsgs, ok = splitAWSBatch(b) + if !ok { + rsyslogMsgs = splitDefault(b) + } } - rsyslogPrefix := `%s %s %s[%d]: %s` - // programName is a mocked ARN in the format expected by our log decoders - programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task - mockPid := 1 - hostname := "aws-batch" - out := [][]byte{} - for _, event := range b.LogEvents { - // Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format. - // Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472) - nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond) - logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro) - - // Fake an RSyslog prefix, expected by consumers - formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message) - out = append(out, []byte(formatted)) + for _, rsyslogMsg := range rsyslogMsgs { + out = append(out, []byte(rsyslogMsg.String())) } return out diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 20d23c5..4f9e512 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -28,7 +28,7 @@ func TestUnpacking(t *testing.T) { LogEvents: []LogEvent{ { ID: "", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.", }, }, @@ -98,7 +98,7 @@ func TestFullLoop(t *testing.T) { assert.Equal(t, leb, output) } -func TestSplit(t *testing.T) { +func TestSplitBatch(t *testing.T) { input := LogEventBatch{ MessageType: "DATA_MESSAGE", Owner: "123456789012", @@ -108,12 +108,12 @@ func TestSplit(t *testing.T) { LogEvents: []LogEvent{ { ID: "99999992379011144044923130086453437181614530551221780480", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "some log line", }, { ID: "99999992387663833181953011865369295871402094815542181889", - Timestamp: 1498519943285, + Timestamp: NewUnixTimestampMillis(1498519943285), Message: "another log line", }, }, @@ -125,3 +125,31 @@ func TestSplit(t *testing.T) { } assert.Equal(t, expected, lines) } + +func TestSplitLambda(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/lambda/env--app", + LogStream: "2018/01/24/[3]62695bfa96de46938f56b156f5235205", + SubscriptionFilters: []string{"ForwardLogsToKinesis"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3", + }, + { + ID: "99999992387663833181953011865369295871402094815542181889", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: `{"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`, + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: START RequestId: 8edbd53f-64c7-4a3c-bf1e-efeff40f6512 Version: 3`), + []byte(`2017-06-26T23:32:23.285001+00:00 aws-lambda env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F8edbd53f-64c7-4a3c-bf1e-efeff40f6512[1]: {"aws_request_id":"8edbd53f-64c7-4a3c-bf1e-efeff40f6512","level":"info","source":"app","title":"some-log-title"}`), + } + assert.Equal(t, expected, lines) +} From c255bf87f3f26602f553839d6871109ea694f6e5 Mon Sep 17 00:00:00 2001 From: Rafael Garcia Date: Thu, 25 Jan 2018 15:13:11 +0000 Subject: [PATCH 2/2] add test for default case --- Makefile | 2 +- batchconsumer/writer.go | 2 +- golang.mk | 26 +++++++++++++++++++++++--- splitter/splitter.go | 7 ++++--- splitter/splitter_test.go | 22 ++++++++++++++++++++++ 5 files changed, 51 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index a075b95..7aa2bcf 100644 --- a/Makefile +++ b/Makefile @@ -62,4 +62,4 @@ $(PKGS): golang-test-all-deps install_deps: golang-dep-vendor-deps - $(call golang-dep-vendor) \ No newline at end of file + $(call golang-dep-vendor) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index d750497..cf5eab4 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -58,7 +58,7 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription - // - records emiited from KPL + // - records emitted from KPL if !splitter.IsGzipped(record) { // Process a single message, from KPL return [][]byte{record}, nil diff --git a/golang.mk b/golang.mk index 8b0bb8b..7d8ba4c 100644 --- a/golang.mk +++ b/golang.mk @@ -1,7 +1,7 @@ # This is the default Clever Golang Makefile. # It is stored in the dev-handbook repo, github.com/Clever/dev-handbook # Please do not alter this file directly. -GOLANG_MK_VERSION := 0.2.0 +GOLANG_MK_VERSION := 0.3.2 SHELL := /bin/bash SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]') @@ -18,7 +18,7 @@ define golang-version-check _ := $(if \ $(shell \ expr >/dev/null \ - `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2` \ + `go version | cut -d" " -f3 | cut -c3- | cut -d. -f2 | sed -E 's/beta[0-9]+//'` \ \>= `echo $(1) | cut -d. -f2` \ \& \ `go version | cut -d" " -f3 | cut -c3- | cut -d. -f1` \ @@ -52,9 +52,16 @@ golang-dep-vendor-deps: bin/dep # golang-godep-vendor is a target for saving dependencies with the dep tool # to the vendor/ directory. All nested vendor/ directories are deleted via # the prune command. +# In CI, -vendor-only is used to avoid updating the lock file. +ifndef CI define golang-dep-vendor -bin/dep ensure +bin/dep ensure -v endef +else +define golang-dep-vendor +bin/dep ensure -v -vendor-only +endef +endif # Golint is a tool for linting Golang code for common errors. GOLINT := $(GOPATH)/bin/golint @@ -147,6 +154,19 @@ $(call golang-vet,$(1)) $(call golang-test-strict,$(1)) endef +# golang-build: builds a golang binary. ensures CGO build is done during CI. This is needed to make a binary that works with a Docker alpine image. +# arg1: pkg path +# arg2: executable name +define golang-build +@echo "BUILDING..." +@if [ -z "$$CI" ]; then \ + go build -o bin/$(2) $(1); \ +else \ + echo "-> Building CGO binary"; \ + CGO_ENABLED=0 go build -installsuffix cgo -o bin/$(2) $(1); \ +fi; +endef + # golang-update-makefile downloads latest version of golang.mk golang-update-makefile: @wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null diff --git a/splitter/splitter.go b/splitter/splitter.go index 88f4a12..0a513ef 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -108,8 +108,9 @@ var awsBatchTaskRegex = regexp.MustCompile(awsBatchTaskMeta) var awsLambdaLogGroupRegex = regexp.MustCompile(`^/aws/lambda/([a-z0-9-]+)--([a-z0-9-]+)$`) var awsLambdaRequestIDRegex = regexp.MustCompile(`[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}`) -// arn cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 +// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425 const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` +const taskCruft = `12345678-1234-1234-1234-555566667777` type RSysLogMessage struct { Timestamp time.Time @@ -180,8 +181,8 @@ func splitDefault(b LogEventBatch) []RSysLogMessage { for _, event := range b.LogEvents { out = append(out, RSysLogMessage{ Timestamp: event.Timestamp.Time(), - Hostname: "unknown", - ProgramName: "unknown", + Hostname: b.LogStream, + ProgramName: b.LogGroup + "--" + b.LogStream + arnCruft + taskCruft, PID: 1, Message: event.Message, }) diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index 4f9e512..81b2df2 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -153,3 +153,25 @@ func TestSplitLambda(t *testing.T) { } assert.Equal(t, expected, lines) } + +func TestSplitDefault(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "vpn_flow_logs", + LogStream: "eni-43403819-all", + SubscriptionFilters: []string{"SomeSubscription"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: NewUnixTimestampMillis(1498519943285), + Message: "2 589690932525 eni-43403819 10.0.0.233 172.217.6.46 64067 443 17 8 3969 1516891809 1516891868 ACCEPT OK", + }, + }, + } + lines := Split(input) + expected := [][]byte{ + []byte(`2017-06-26T23:32:23.285001+00:00 eni-43403819-all vpn_flow_logs--eni-43403819-all/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: 2 589690932525 eni-43403819 10.0.0.233 172.217.6.46 64067 443 17 8 3969 1516891809 1516891868 ACCEPT OK`), + } + assert.Equal(t, expected, lines) +}