Compare commits
51 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f78d1c6b9b | ||
|
|
a8a4e207fe | ||
|
|
e66f1922b7 | ||
|
|
dc3e5e097c | ||
|
|
9a1371d965 | ||
|
|
cf19a05ced | ||
|
|
3a42d32e53 | ||
|
|
d461ebebb6 | ||
|
|
c11acd24ec | ||
|
|
ad3e33c7d6 | ||
|
|
fa54b6ee39 | ||
|
|
c164f1d895 | ||
|
|
6ac018712d | ||
|
|
7351a3d3f7 | ||
|
|
358de6589c | ||
|
|
fe41fdaac3 | ||
|
|
102de58470 | ||
|
|
54f91f4073 | ||
|
|
29f86e9986 | ||
|
|
42c3480288 | ||
|
|
cc4f7716b3 | ||
|
|
6e8a99c50d | ||
|
|
4dd769ffca | ||
|
|
f8e9c34641 | ||
|
|
2a7e96157e | ||
|
|
4fe27d0d39 | ||
|
|
6b2d1f8a56 | ||
|
|
611f0c0a60 | ||
|
|
1112894639 | ||
|
|
484b54bfe0 | ||
|
|
f5ce6fe4e7 | ||
|
|
9b32d93d1c | ||
|
|
3bedc65483 | ||
|
|
b16af062fb | ||
|
|
30d3925119 | ||
|
|
ebd1e0e39c | ||
|
|
bfe1a6ffb9 | ||
|
|
35b07d7aed | ||
|
|
8a8b44112d | ||
|
|
8be774bc09 | ||
|
|
504ebfad60 | ||
|
|
420ad243a4 | ||
|
|
337d2063f5 | ||
|
|
eb11747434 | ||
|
|
d489b5039b | ||
|
|
e7db80a35a | ||
|
|
5132214945 | ||
|
|
e265494afb | ||
|
|
69d31088f3 | ||
|
|
7446627fae | ||
|
|
9cbcf1096a |
18 changed files with 1046 additions and 315 deletions
|
|
@ -3,9 +3,10 @@ jobs:
|
||||||
build:
|
build:
|
||||||
working_directory: /go/src/github.com/Clever/amazon-kinesis-client-go
|
working_directory: /go/src/github.com/Clever/amazon-kinesis-client-go
|
||||||
docker:
|
docker:
|
||||||
- image: circleci/golang:1.12-stretch
|
- image: circleci/golang:1.13-stretch
|
||||||
- image: circleci/mongo:3.2.20-jessie-ram
|
- image: circleci/mongo:3.2.20-jessie-ram
|
||||||
environment:
|
environment:
|
||||||
|
GOPRIVATE: github.com/Clever/*
|
||||||
CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
|
CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
|
||||||
CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
|
CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
|
||||||
steps:
|
steps:
|
||||||
|
|
@ -17,7 +18,13 @@ jobs:
|
||||||
- run:
|
- run:
|
||||||
command: mkdir -p $CIRCLE_ARTIFACTS $CIRCLE_TEST_REPORTS
|
command: mkdir -p $CIRCLE_ARTIFACTS $CIRCLE_TEST_REPORTS
|
||||||
name: Set up CircleCI artifacts directories
|
name: Set up CircleCI artifacts directories
|
||||||
|
- run:
|
||||||
|
command: git config --global "url.ssh://git@github.com/Clever".insteadOf "https://github.com/Clever"
|
||||||
|
- run:
|
||||||
|
name: Add github.com to known hosts
|
||||||
|
command: mkdir -p ~/.ssh && touch ~/.ssh/known_hosts && echo 'github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=' >> ~/.ssh/known_hosts
|
||||||
- run: make install_deps
|
- run: make install_deps
|
||||||
- run: make build
|
- run: make build
|
||||||
- run: make bench
|
- run: make bench
|
||||||
- run: make test
|
- run: make test
|
||||||
|
- run: if [ "${CIRCLE_BRANCH}" == "master" ]; then $HOME/ci-scripts/circleci/github-release $GH_RELEASE_TOKEN; fi;
|
||||||
|
|
|
||||||
1
.github/CODEOWNERS
vendored
Normal file
1
.github/CODEOWNERS
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
* @Clever/eng-infra
|
||||||
20
.github/workflows/notify-ci-status.yml
vendored
Normal file
20
.github/workflows/notify-ci-status.yml
vendored
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
name: Notify CI status
|
||||||
|
|
||||||
|
on:
|
||||||
|
check_suite:
|
||||||
|
types: [completed]
|
||||||
|
status:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
call-workflow:
|
||||||
|
if: >-
|
||||||
|
(github.event.branches[0].name == github.event.repository.default_branch &&
|
||||||
|
(github.event.state == 'error' || github.event.state == 'failure')) ||
|
||||||
|
(github.event.check_suite.head_branch == github.event.repository.default_branch &&
|
||||||
|
github.event.check_suite.conclusion != 'success')
|
||||||
|
uses: Clever/ci-scripts/.github/workflows/reusable-notify-ci-status.yml@master
|
||||||
|
secrets:
|
||||||
|
CIRCLE_CI_INTEGRATIONS_URL: ${{ secrets.CIRCLE_CI_INTEGRATIONS_URL }}
|
||||||
|
CIRCLE_CI_INTEGRATIONS_USERNAME: ${{ secrets.CIRCLE_CI_INTEGRATIONS_USERNAME }}
|
||||||
|
CIRCLE_CI_INTEGRATIONS_PASSWORD: ${{ secrets.CIRCLE_CI_INTEGRATIONS_PASSWORD }}
|
||||||
|
SLACK_BOT_TOKEN: ${{ secrets.DAPPLE_BOT_TOKEN }}
|
||||||
84
Gopkg.lock
generated
84
Gopkg.lock
generated
|
|
@ -1,84 +0,0 @@
|
||||||
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
|
|
||||||
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/Clever/syslogparser"
|
|
||||||
packages = [
|
|
||||||
".",
|
|
||||||
"rfc3164"
|
|
||||||
]
|
|
||||||
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/davecgh/go-spew"
|
|
||||||
packages = ["spew"]
|
|
||||||
revision = "6d212800a42e8ab5c146b8ace3490ee17e5225f9"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/jeromer/syslogparser"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "0e4ae46ea3f08de351074b643d649d5d00661a3c"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/pmezard/go-difflib"
|
|
||||||
packages = ["difflib"]
|
|
||||||
revision = "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/stretchr/testify"
|
|
||||||
packages = [
|
|
||||||
"assert",
|
|
||||||
"require"
|
|
||||||
]
|
|
||||||
revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0"
|
|
||||||
version = "v1.1.4"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/xeipuuv/gojsonpointer"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "e0fe6f68307607d540ed8eac07a342c33fa1b54a"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/xeipuuv/gojsonreference"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "e02fc20de94c78484cd5ffb007f8af96be030a45"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "github.com/xeipuuv/gojsonschema"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "e18f0065e8c148fcf567ac43a3f8f5b66ac0720b"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "golang.org/x/net"
|
|
||||||
packages = ["context"]
|
|
||||||
revision = "a6577fac2d73be281a500b310739095313165611"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "golang.org/x/time"
|
|
||||||
packages = ["rate"]
|
|
||||||
revision = "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "gopkg.in/Clever/kayvee-go.v6"
|
|
||||||
packages = [
|
|
||||||
".",
|
|
||||||
"logger",
|
|
||||||
"router"
|
|
||||||
]
|
|
||||||
revision = "096364e316a52652d3493be702d8105d8d01db84"
|
|
||||||
version = "v6.6.0"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "gopkg.in/yaml.v2"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "a5b47d31c556af34a302ce5d659e6fea44d90de0"
|
|
||||||
|
|
||||||
[solve-meta]
|
|
||||||
analyzer-name = "dep"
|
|
||||||
analyzer-version = 1
|
|
||||||
inputs-digest = "4699293f3632dd38561ff60477aa7cc1ecaadc5808b974d017099e2189679286"
|
|
||||||
solver-name = "gps-cdcl"
|
|
||||||
solver-version = 1
|
|
||||||
36
Gopkg.toml
36
Gopkg.toml
|
|
@ -1,36 +0,0 @@
|
||||||
|
|
||||||
# Gopkg.toml example
|
|
||||||
#
|
|
||||||
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
|
|
||||||
# for detailed Gopkg.toml documentation.
|
|
||||||
#
|
|
||||||
# required = ["github.com/user/thing/cmd/thing"]
|
|
||||||
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
|
|
||||||
#
|
|
||||||
# [[constraint]]
|
|
||||||
# name = "github.com/user/project"
|
|
||||||
# version = "1.0.0"
|
|
||||||
#
|
|
||||||
# [[constraint]]
|
|
||||||
# name = "github.com/user/project2"
|
|
||||||
# branch = "dev"
|
|
||||||
# source = "github.com/myfork/project2"
|
|
||||||
#
|
|
||||||
# [[override]]
|
|
||||||
# name = "github.com/x/y"
|
|
||||||
# version = "2.4.0"
|
|
||||||
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/Clever/syslogparser"
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "github.com/stretchr/testify"
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "golang.org/x/time"
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "gopkg.in/Clever/kayvee-go.v6"
|
|
||||||
version = "6.0.0"
|
|
||||||
8
Makefile
8
Makefile
|
|
@ -5,7 +5,7 @@ SHELL := /bin/bash
|
||||||
PKG := github.com/Clever/amazon-kinesis-client-go
|
PKG := github.com/Clever/amazon-kinesis-client-go
|
||||||
PKGS := $(shell go list ./... | grep -v /vendor )
|
PKGS := $(shell go list ./... | grep -v /vendor )
|
||||||
.PHONY: download_jars run build
|
.PHONY: download_jars run build
|
||||||
$(eval $(call golang-version-check,1.12))
|
$(eval $(call golang-version-check,1.13))
|
||||||
|
|
||||||
CONSUMER ?= consumer
|
CONSUMER ?= consumer
|
||||||
TMP_DIR := ./tmp-jars
|
TMP_DIR := ./tmp-jars
|
||||||
|
|
@ -37,7 +37,7 @@ download_jars:
|
||||||
mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/
|
mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/
|
||||||
# Download the STS jar file for supporting IAM Roles
|
# Download the STS jar file for supporting IAM Roles
|
||||||
ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt
|
ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt
|
||||||
curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar
|
curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar
|
||||||
rm -r $(TMP_DIR)
|
rm -r $(TMP_DIR)
|
||||||
|
|
||||||
all: test build
|
all: test build
|
||||||
|
|
@ -61,5 +61,5 @@ $(PKGS): golang-test-all-deps
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
install_deps: golang-dep-vendor-deps
|
install_deps:
|
||||||
$(call golang-dep-vendor)
|
go mod vendor
|
||||||
|
|
|
||||||
1
VERSION
Normal file
1
VERSION
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
1.0.0
|
||||||
|
|
@ -30,16 +30,17 @@ type datum struct {
|
||||||
var queue = make(chan datum, 1000)
|
var queue = make(chan datum, 1000)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
data := map[string]int{}
|
countData := map[string]int{}
|
||||||
|
gaugeData := map[string]int{}
|
||||||
tick := time.Tick(time.Minute)
|
tick := time.Tick(time.Minute)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case d := <-queue:
|
case d := <-queue:
|
||||||
if d.category == "counter" {
|
if d.category == "counter" {
|
||||||
data[d.name] = data[d.name] + d.value
|
countData[d.name] = countData[d.name] + d.value
|
||||||
} else if d.category == "gauge" {
|
} else if d.category == "gauge" {
|
||||||
data[d.name] = d.value
|
gaugeData[d.name] = d.value
|
||||||
} else {
|
} else {
|
||||||
log.ErrorD("unknown-stat-category", logger.M{"category": d.category})
|
log.ErrorD("unknown-stat-category", logger.M{"category": d.category})
|
||||||
}
|
}
|
||||||
|
|
@ -48,10 +49,14 @@ func init() {
|
||||||
for _, k := range DefaultCounters {
|
for _, k := range DefaultCounters {
|
||||||
tmp[k] = 0
|
tmp[k] = 0
|
||||||
}
|
}
|
||||||
for k, v := range data {
|
for k, v := range countData {
|
||||||
|
tmp[k] = v
|
||||||
|
}
|
||||||
|
for k, v := range gaugeData {
|
||||||
tmp[k] = v
|
tmp[k] = v
|
||||||
}
|
}
|
||||||
log.InfoD("stats", tmp)
|
log.InfoD("stats", tmp)
|
||||||
|
countData = map[string]int{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
|
|
@ -57,19 +57,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
|
||||||
// We handle two types of records:
|
|
||||||
// - records emitted from CWLogs Subscription
|
|
||||||
// - records emitted from KPL
|
|
||||||
if !splitter.IsGzipped(record) {
|
|
||||||
// Process a single message, from KPL
|
|
||||||
return [][]byte{record}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process a batch of messages from a CWLogs Subscription
|
|
||||||
return splitter.GetMessagesFromGzippedInput(record)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
var pair kcl.SequencePair
|
var pair kcl.SequencePair
|
||||||
prevPair := b.lastProcessedSeq
|
prevPair := b.lastProcessedSeq
|
||||||
|
|
@ -93,7 +80,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
messages, err := b.splitMessageIfNecessary(data)
|
messages, err := splitter.SplitMessageIfNecessary(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -352,7 +352,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
|
||||||
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
|
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStaggeredCheckpionting(t *testing.T) {
|
func TestStaggeredCheckpointing(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
||||||
mockFailedLogsFile := logger.New("testing")
|
mockFailedLogsFile := logger.New("testing")
|
||||||
|
|
|
||||||
BIN
bin/dep
Executable file
BIN
bin/dep
Executable file
Binary file not shown.
207
decode/decode.go
207
decode/decode.go
|
|
@ -2,9 +2,11 @@ package decode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Clever/syslogparser/rfc3164"
|
"github.com/Clever/syslogparser/rfc3164"
|
||||||
)
|
)
|
||||||
|
|
@ -15,9 +17,18 @@ var reservedFields = []string{
|
||||||
"prefix",
|
"prefix",
|
||||||
"postfix",
|
"postfix",
|
||||||
"decoder_msg_type",
|
"decoder_msg_type",
|
||||||
|
|
||||||
|
// used by all logs
|
||||||
"timestamp",
|
"timestamp",
|
||||||
"hostname",
|
"hostname",
|
||||||
"rawlog",
|
"rawlog",
|
||||||
|
|
||||||
|
// Used by Kayvee
|
||||||
|
"prefix",
|
||||||
|
"postfix",
|
||||||
|
|
||||||
|
// Set to the deepest step of parsing that succeeded
|
||||||
|
"decoder_msg_type",
|
||||||
}
|
}
|
||||||
|
|
||||||
func stringInSlice(s string, slice []string) bool {
|
func stringInSlice(s string, slice []string) bool {
|
||||||
|
|
@ -107,7 +118,7 @@ func FieldsFromKayvee(line string) (map[string]interface{}, error) {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var userPattern = `#\sUser@Host:\s(?P<user>[a-zA-Z]+\[[a-zA-Z]+\])\s@\s[a-zA-Z]+.*Id:\s+(?P<id>[0-9]+)`
|
var userPattern = `#\sUser@Host:\s(?P<user>[a-zA-Z]+\[[a-zA-Z]+\])\s@\s.*Id:\s+(?P<id>[0-9]+)`
|
||||||
var userPatternRegex = regexp.MustCompile(userPattern)
|
var userPatternRegex = regexp.MustCompile(userPattern)
|
||||||
|
|
||||||
func FieldsFromRDSSlowquery(rawlog string) map[string]interface{} {
|
func FieldsFromRDSSlowquery(rawlog string) map[string]interface{} {
|
||||||
|
|
@ -385,17 +396,21 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
|
||||||
|
|
||||||
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
|
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
|
||||||
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
||||||
out := map[string]interface{}{}
|
syslog, syslogErr := FieldsFromSyslog(line)
|
||||||
|
if syslogErr == nil {
|
||||||
|
return decodeSyslog(syslog, env)
|
||||||
|
}
|
||||||
|
|
||||||
syslogFields, err := FieldsFromSyslog(line)
|
fluentLog, fluentErr := FieldsFromFluentbitLog(line)
|
||||||
if err != nil {
|
if fluentErr == nil {
|
||||||
return map[string]interface{}{}, err
|
return decodeFluent(fluentLog, env)
|
||||||
}
|
}
|
||||||
for k, v := range syslogFields {
|
|
||||||
out[k] = v
|
return nil, fmt.Errorf("unable to parse log line with errors `%v` and `%v`", syslogErr, fluentErr)
|
||||||
}
|
}
|
||||||
rawlog := syslogFields["rawlog"].(string)
|
|
||||||
programname := syslogFields["programname"].(string)
|
func decodeSyslog(syslog map[string]interface{}, env string) (map[string]interface{}, error) {
|
||||||
|
rawlog := syslog["rawlog"].(string)
|
||||||
|
|
||||||
// Try pulling Kayvee fields out of message
|
// Try pulling Kayvee fields out of message
|
||||||
kvFields, err := FieldsFromKayvee(rawlog)
|
kvFields, err := FieldsFromKayvee(rawlog)
|
||||||
|
|
@ -405,56 +420,39 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for k, v := range kvFields {
|
for k, v := range kvFields {
|
||||||
out[k] = v
|
syslog[k] = v
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inject additional fields that are useful in log-searching and other business logic
|
|
||||||
out["env"] = env
|
|
||||||
|
|
||||||
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
|
||||||
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
|
||||||
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
|
||||||
// docker-events app.
|
|
||||||
forceEnv := ""
|
|
||||||
forceApp := ""
|
|
||||||
forceTask := ""
|
|
||||||
if cEnv, ok := out["container_env"]; ok {
|
|
||||||
forceEnv = cEnv.(string)
|
|
||||||
}
|
|
||||||
if cApp, ok := out["container_app"]; ok {
|
|
||||||
forceApp = cApp.(string)
|
|
||||||
}
|
|
||||||
if cTask, ok := out["container_task"]; ok {
|
|
||||||
forceTask = cTask.(string)
|
|
||||||
}
|
|
||||||
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
|
|
||||||
if err == nil {
|
|
||||||
for k, v := range meta {
|
|
||||||
out[k] = v
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try pulling RDS slowquery logs fields out of message
|
// Try pulling RDS slowquery logs fields out of message
|
||||||
if out["hostname"] == "aws-rds" {
|
if syslog["hostname"] == "aws-rds" {
|
||||||
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
|
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
|
||||||
for k, v := range slowQueryFields {
|
for k, v := range slowQueryFields {
|
||||||
out[k] = v
|
syslog[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
// Inject additional fields that are useful in log-searching and other business logic
|
||||||
|
syslog["env"] = env
|
||||||
|
|
||||||
|
// this can error, which indicates inability to extract container meta. That's fine, we can ignore that.
|
||||||
|
addContainterMetaToSyslog(syslog)
|
||||||
|
|
||||||
|
return syslog, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
||||||
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
|
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
|
||||||
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32})` // task-id (ECS, both EC2 and Fargate)
|
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID
|
||||||
|
|
||||||
var containerMetaRegex = regexp.MustCompile(containerMeta)
|
var containerMetaRegex = regexp.MustCompile(containerMeta)
|
||||||
|
|
||||||
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
|
var errBadProgramname = errors.New("invalid or missing programname")
|
||||||
if programname == "" {
|
|
||||||
return map[string]string{}, fmt.Errorf("no programname")
|
func addContainterMetaToSyslog(syslog map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
programname, ok := syslog["programname"].(string)
|
||||||
|
if !ok || programname == "" {
|
||||||
|
return syslog, errBadProgramname
|
||||||
}
|
}
|
||||||
|
|
||||||
env := ""
|
env := ""
|
||||||
|
|
@ -465,25 +463,116 @@ func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[st
|
||||||
env = matches[0][1]
|
env = matches[0][1]
|
||||||
app = matches[0][2]
|
app = matches[0][2]
|
||||||
task = matches[0][4]
|
task = matches[0][4]
|
||||||
|
} else {
|
||||||
|
return syslog, errBadProgramname
|
||||||
}
|
}
|
||||||
|
|
||||||
if forceEnv != "" {
|
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
||||||
env = forceEnv
|
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
||||||
|
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
||||||
|
// docker-events app.
|
||||||
|
if existingEnv, ok := syslog["container_env"].(string); !ok || existingEnv == "" {
|
||||||
|
syslog["container_env"] = env
|
||||||
}
|
}
|
||||||
if forceApp != "" {
|
if existingApp, ok := syslog["container_app"].(string); !ok || existingApp == "" {
|
||||||
app = forceApp
|
syslog["container_app"] = app
|
||||||
}
|
}
|
||||||
if forceTask != "" {
|
if existingTask, ok := syslog["container_task"].(string); !ok || existingTask == "" {
|
||||||
task = forceTask
|
syslog["container_task"] = task
|
||||||
}
|
}
|
||||||
|
|
||||||
if env == "" || app == "" || task == "" {
|
return syslog, nil
|
||||||
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
|
}
|
||||||
}
|
|
||||||
|
const (
|
||||||
return map[string]string{
|
// These are the fields in the incoming fluentbit JSON that we expect the timestamp and log
|
||||||
"container_env": env,
|
// They are referenced below by the FluentLog type; those are the ones that matter.
|
||||||
"container_app": app,
|
fluentbitTimestampField = "fluent_ts"
|
||||||
"container_task": task,
|
fluentbitLogField = "log"
|
||||||
}, nil
|
|
||||||
|
// This is what we get from the fluentbig config: `time_key_format: "%Y-%m-%dT%H:%M:%S.%L%z"`
|
||||||
|
fluentTimeFormat = "2006-01-02T15:04:05.999-0700"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FluentLog represents is the set of fields extracted from an incoming fluentbit log
|
||||||
|
// The struct tags must line up with the JSON schema in the fluentbit configuration, see the comment for FieldsFromFluentBitlog
|
||||||
|
type FluentLog struct {
|
||||||
|
Log *string `json:"log"`
|
||||||
|
Timestamp *string `json:"fluent_ts"`
|
||||||
|
TaskArn string `json:"ecs_task_arn"`
|
||||||
|
TaskDefinition string `json:"ecs_task_definition"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FieldsFromFluentbitLog parses JSON object with fields indicating that it's coming from FluentBit
|
||||||
|
// Its return value shares a common interface with the Syslog output - with the same four key fields
|
||||||
|
// Unlike FieldsFromSyslog, it accepts its argument as bytes, since it will be used as bytes immediately and bytes is what comes off the firehose
|
||||||
|
// In theory, the format we recieve is highly customizable, so we'll be making the following assumptions:
|
||||||
|
// - All logs are coming from aws-fargate with the ecs-metadata fields (ecs_cluster, ecs_task_arn, ecs_task_definition) enabled
|
||||||
|
// - The timestamp is in the field given by the FluentBitTimestampField constant in this package
|
||||||
|
// - The timestamp is of the format of the constant fluentTimestampFormat
|
||||||
|
// - The log is in the field given by the FluentBitlogField constant in this package
|
||||||
|
// - The ecs_task_definition is of the from {environment}--{application}--.*
|
||||||
|
func FieldsFromFluentbitLog(line string) (*FluentLog, error) {
|
||||||
|
fluentFields := FluentLog{}
|
||||||
|
if err := json.Unmarshal([]byte(line), &fluentFields); err != nil {
|
||||||
|
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &fluentFields, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeFluent(fluentLog *FluentLog, env string) (map[string]interface{}, error) {
|
||||||
|
out := map[string]interface{}{}
|
||||||
|
if fluentLog.Timestamp == nil || *fluentLog.Timestamp == "" {
|
||||||
|
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no timestamp found in input field %s", fluentbitTimestampField)}
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeValue, err := time.Parse(fluentTimeFormat, *fluentLog.Timestamp); err == nil {
|
||||||
|
out["timestamp"] = timeValue
|
||||||
|
} else {
|
||||||
|
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("timestamp has bad format: %s", *fluentLog.Timestamp)}
|
||||||
|
}
|
||||||
|
|
||||||
|
if fluentLog.Log == nil {
|
||||||
|
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no log found in input field %s", fluentbitLogField)}
|
||||||
|
}
|
||||||
|
log := *fluentLog.Log
|
||||||
|
out["rawlog"] = log
|
||||||
|
|
||||||
|
out["decoder_msg_type"] = "fluentbit"
|
||||||
|
out["hostname"] = "aws-fargate"
|
||||||
|
|
||||||
|
// best effort to add container_env|app|task
|
||||||
|
if parts := strings.SplitN(fluentLog.TaskDefinition, "--", 3); len(parts) == 3 {
|
||||||
|
out["container_env"] = parts[0]
|
||||||
|
out["container_app"] = parts[1]
|
||||||
|
}
|
||||||
|
if idx := strings.LastIndex(fluentLog.TaskArn, "/"); idx != -1 {
|
||||||
|
out["container_task"] = fluentLog.TaskArn[idx+1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
kvFields, err := FieldsFromKayvee(log)
|
||||||
|
if err == nil {
|
||||||
|
for k, v := range kvFields {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inject additional fields that are useful in log-searching and other business logic; mimicking syslog behavior
|
||||||
|
out["env"] = env
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type BadLogFormatError struct {
|
||||||
|
Format string
|
||||||
|
DecodingError error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BadLogFormatError) Error() string {
|
||||||
|
return fmt.Sprintf("trying to decode log as format %s: %v", b.Format, b.DecodingError)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BadLogFormatError) Unwrap() error {
|
||||||
|
return b.DecodingError
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package decode
|
package decode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -221,10 +222,10 @@ func TestSyslogDecoding(t *testing.T) {
|
||||||
fields, err := FieldsFromSyslog(spec.Input)
|
fields, err := FieldsFromSyslog(spec.Input)
|
||||||
if spec.ExpectedError != nil {
|
if spec.ExpectedError != nil {
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
assert.IsType(spec.ExpectedError, err)
|
assert.True(errors.As(err, &spec.ExpectedError))
|
||||||
} else {
|
return
|
||||||
assert.NoError(err)
|
|
||||||
}
|
}
|
||||||
|
assert.NoError(err)
|
||||||
assert.Equal(spec.ExpectedOutput, fields)
|
assert.Equal(spec.ExpectedOutput, fields)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -251,6 +252,11 @@ func TestParseAndEnhance(t *testing.T) {
|
||||||
}
|
}
|
||||||
logTime3 = logTime3.UTC()
|
logTime3 = logTime3.UTC()
|
||||||
|
|
||||||
|
logTime4, err := time.Parse(fluentTimeFormat, "2020-08-13T21:10:57.000+0000")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
specs := []ParseAndEnhanceSpec{
|
specs := []ParseAndEnhanceSpec{
|
||||||
ParseAndEnhanceSpec{
|
ParseAndEnhanceSpec{
|
||||||
Title: "Parses a Kayvee log line from an ECS app",
|
Title: "Parses a Kayvee log line from an ECS app",
|
||||||
|
|
@ -307,10 +313,10 @@ func TestParseAndEnhance(t *testing.T) {
|
||||||
ExpectedError: nil,
|
ExpectedError: nil,
|
||||||
},
|
},
|
||||||
ParseAndEnhanceSpec{
|
ParseAndEnhanceSpec{
|
||||||
Title: "Fails to parse non-RSyslog log line",
|
Title: "Fails to parse non-RSyslog, non-Fluent log line",
|
||||||
Line: `not rsyslog`,
|
Line: `not rsyslog`,
|
||||||
ExpectedOutput: map[string]interface{}{},
|
ExpectedOutput: map[string]interface{}{},
|
||||||
ExpectedError: &syslogparser.ParserError{},
|
ExpectedError: fmt.Errorf(""),
|
||||||
},
|
},
|
||||||
ParseAndEnhanceSpec{
|
ParseAndEnhanceSpec{
|
||||||
Title: "Parses JSON values",
|
Title: "Parses JSON values",
|
||||||
|
|
@ -345,7 +351,7 @@ func TestParseAndEnhance(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
ParseAndEnhanceSpec{
|
ParseAndEnhanceSpec{
|
||||||
Title: "RDS Slowquery Log",
|
Title: "RDS Slowquery Log rdsadmin",
|
||||||
Line: `2017-04-05T21:57:46+00:00 aws-rds production-aurora-test-db: Slow query: # Time: 190921 16:02:59
|
Line: `2017-04-05T21:57:46+00:00 aws-rds production-aurora-test-db: Slow query: # Time: 190921 16:02:59
|
||||||
# User@Host: rdsadmin[rdsadmin] @ localhost [] Id: 1
|
# User@Host: rdsadmin[rdsadmin] @ localhost [] Id: 1
|
||||||
# Query_time: 22.741550 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0SET timestamp=1569081779;call action start_seamless_scaling('AQEAAB1P/PAIqvTHEQFJAEkojZUoH176FGJttZ62JF5QmRehaf0S0VFTa+5MPJdYQ9k0/sekBlnMi8U=', 300000, 2);
|
# Query_time: 22.741550 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0SET timestamp=1569081779;call action start_seamless_scaling('AQEAAB1P/PAIqvTHEQFJAEkojZUoH176FGJttZ62JF5QmRehaf0S0VFTa+5MPJdYQ9k0/sekBlnMi8U=', 300000, 2);
|
||||||
|
|
@ -361,6 +367,179 @@ SET timestamp=1569862702;`,
|
||||||
"user_id": "1",
|
"user_id": "1",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ParseAndEnhanceSpec{
|
||||||
|
Title: "RDS Slowquery Log clever user",
|
||||||
|
Line: `2017-04-05T21:57:46+00:00 aws-rds production-aurora-test-db: Slow query: # Time: 190921 16:02:59
|
||||||
|
# User@Host: clever[clever] @ [10.1.11.112] Id: 868
|
||||||
|
# Query_time: 2.000270 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0
|
||||||
|
SET timestamp=1571090308;
|
||||||
|
select sleep(2);`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"env": "deploy-env",
|
||||||
|
"hostname": "aws-rds",
|
||||||
|
"programname": "production-aurora-test-db",
|
||||||
|
"decoder_msg_type": "syslog",
|
||||||
|
"rawlog": "Slow query: # Time: 190921 16:02:59\n# User@Host: clever[clever] @ [10.1.11.112] Id: 868\n# Query_time: 2.000270 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0\nSET timestamp=1571090308;\nselect sleep(2);",
|
||||||
|
"timestamp": logTime2,
|
||||||
|
"user": "clever[clever]",
|
||||||
|
"user_id": "868",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "Valid fluent log",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"ecs_task_definition": "env--app--d--2:2",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"timestamp": logTime4,
|
||||||
|
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"hostname": "aws-fargate",
|
||||||
|
"decoder_msg_type": "Kayvee",
|
||||||
|
"title": "request-finished",
|
||||||
|
"container_app": "app",
|
||||||
|
"container_env": "env",
|
||||||
|
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"prefix": "2020/08/13 21:10:57 ",
|
||||||
|
"postfix": "",
|
||||||
|
"env": "deploy-env",
|
||||||
|
},
|
||||||
|
ExpectedError: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "fluent log missing ecs_task_definition still parses and has container_task",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"timestamp": logTime4,
|
||||||
|
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"hostname": "aws-fargate",
|
||||||
|
"decoder_msg_type": "Kayvee",
|
||||||
|
"title": "request-finished",
|
||||||
|
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"prefix": "2020/08/13 21:10:57 ",
|
||||||
|
"postfix": "",
|
||||||
|
"env": "deploy-env",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "fluent log with bad ecs_task_definition format still parses and has container_task",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"ecs_task_definition": "env--app",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"timestamp": logTime4,
|
||||||
|
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"hostname": "aws-fargate",
|
||||||
|
"decoder_msg_type": "Kayvee",
|
||||||
|
"title": "request-finished",
|
||||||
|
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"prefix": "2020/08/13 21:10:57 ",
|
||||||
|
"postfix": "",
|
||||||
|
"env": "deploy-env",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "fluent log missing ecs_task_arn still parses and has containter_env and app",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_definition": "env--app--d--2:2",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"timestamp": logTime4, // time.Date(2020, 8, 13, 21, 10, 57, 0, time.UTC),
|
||||||
|
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||||
|
"hostname": "aws-fargate",
|
||||||
|
"decoder_msg_type": "Kayvee",
|
||||||
|
"title": "request-finished",
|
||||||
|
"container_env": "env",
|
||||||
|
"container_app": "app",
|
||||||
|
"prefix": "2020/08/13 21:10:57 ",
|
||||||
|
"postfix": "",
|
||||||
|
"env": "deploy-env",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "fluent log missing timestamp is an error",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"ecs_task_definition": "env--app--d--2:2",
|
||||||
|
"ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: nil,
|
||||||
|
ExpectedError: BadLogFormatError{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "fluent log with bad timestamp format is an error",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"ecs_task_definition": "env--app--d--2:2",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000Z",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: nil,
|
||||||
|
ExpectedError: BadLogFormatError{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Title: "Valid fluent log with overrides for container_env, app, and task",
|
||||||
|
Line: `{
|
||||||
|
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||||
|
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||||
|
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||||
|
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||||
|
"ecs_task_definition": "env--app--d--2:2",
|
||||||
|
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||||
|
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||||
|
"source": "stderr"
|
||||||
|
}`,
|
||||||
|
ExpectedOutput: map[string]interface{}{
|
||||||
|
"timestamp": logTime4,
|
||||||
|
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||||
|
"hostname": "aws-fargate",
|
||||||
|
"decoder_msg_type": "Kayvee",
|
||||||
|
"title": "request-finished",
|
||||||
|
"container_app": "app2",
|
||||||
|
"container_env": "env2",
|
||||||
|
"container_task": "task",
|
||||||
|
"prefix": "2020/08/13 21:10:57 ",
|
||||||
|
"postfix": "",
|
||||||
|
"env": "deploy-env",
|
||||||
|
},
|
||||||
|
ExpectedError: nil,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, spec := range specs {
|
for _, spec := range specs {
|
||||||
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
|
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
|
||||||
|
|
@ -368,72 +547,114 @@ SET timestamp=1569862702;`,
|
||||||
fields, err := ParseAndEnhance(spec.Line, "deploy-env")
|
fields, err := ParseAndEnhance(spec.Line, "deploy-env")
|
||||||
if spec.ExpectedError != nil {
|
if spec.ExpectedError != nil {
|
||||||
assert.Error(err)
|
assert.Error(err)
|
||||||
assert.IsType(spec.ExpectedError, err)
|
assert.True(errors.As(err, &spec.ExpectedError))
|
||||||
} else {
|
return
|
||||||
assert.NoError(err)
|
|
||||||
}
|
}
|
||||||
|
assert.NoError(err)
|
||||||
assert.Equal(spec.ExpectedOutput, fields)
|
assert.Equal(spec.ExpectedOutput, fields)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetContainerMeta(t *testing.T) {
|
func TestGetContainerMeta(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
t.Log("Must have a programname to get container meta")
|
type containerMetaSpec struct {
|
||||||
programname := ""
|
description string
|
||||||
_, err := getContainerMeta(programname, "", "", "")
|
input map[string]interface{}
|
||||||
assert.Error(err)
|
wantErr bool
|
||||||
|
wantContainerEnv string
|
||||||
|
wantContainerApp string
|
||||||
|
wantContainerTask string
|
||||||
|
}
|
||||||
|
|
||||||
t.Log("Can parse a programname")
|
tests := []containerMetaSpec{
|
||||||
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
{
|
||||||
meta, err := getContainerMeta(programname, "", "", "")
|
description: "Must have a programname to get container meta",
|
||||||
assert.NoError(err)
|
input: map[string]interface{}{
|
||||||
assert.Equal(map[string]string{
|
"programname": "",
|
||||||
"container_env": "env1",
|
},
|
||||||
"container_app": "app2",
|
wantErr: true,
|
||||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
},
|
||||||
}, meta)
|
{
|
||||||
|
description: "Can parse a programname",
|
||||||
|
input: map[string]interface{}{
|
||||||
|
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||||
|
},
|
||||||
|
wantContainerEnv: "env1",
|
||||||
|
wantContainerApp: "app2",
|
||||||
|
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Can override just 'env'",
|
||||||
|
input: map[string]interface{}{
|
||||||
|
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||||
|
"container_env": "force-env",
|
||||||
|
},
|
||||||
|
wantContainerEnv: "force-env",
|
||||||
|
wantContainerApp: "app2",
|
||||||
|
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Can override just 'app'",
|
||||||
|
input: map[string]interface{}{
|
||||||
|
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||||
|
"container_app": "force-app",
|
||||||
|
},
|
||||||
|
wantContainerEnv: "env1",
|
||||||
|
wantContainerApp: "force-app",
|
||||||
|
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Can override just 'task'",
|
||||||
|
input: map[string]interface{}{
|
||||||
|
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||||
|
"container_task": "force-task",
|
||||||
|
},
|
||||||
|
wantContainerEnv: "env1",
|
||||||
|
wantContainerApp: "app2",
|
||||||
|
wantContainerTask: "force-task",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Can override all fields",
|
||||||
|
input: map[string]interface{}{
|
||||||
|
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||||
|
"container_task": "force-task",
|
||||||
|
"container_app": "force-app",
|
||||||
|
"container_env": "force-env",
|
||||||
|
},
|
||||||
|
wantContainerEnv: "force-env",
|
||||||
|
wantContainerApp: "force-app",
|
||||||
|
wantContainerTask: "force-task",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
t.Log("Can override just 'env'")
|
for _, tcase := range tests {
|
||||||
overrideEnv := "force-env"
|
assert := assert.New(t)
|
||||||
meta, err = getContainerMeta(programname, overrideEnv, "", "")
|
|
||||||
assert.NoError(err)
|
|
||||||
assert.Equal(map[string]string{
|
|
||||||
"container_env": overrideEnv,
|
|
||||||
"container_app": "app2",
|
|
||||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
|
||||||
}, meta)
|
|
||||||
|
|
||||||
t.Log("Can override just 'app'")
|
syslog := map[string]interface{}{}
|
||||||
overrideApp := "force-app"
|
for k, v := range tcase.input {
|
||||||
meta, err = getContainerMeta(programname, "", overrideApp, "")
|
syslog[k] = v
|
||||||
assert.NoError(err)
|
}
|
||||||
assert.Equal(map[string]string{
|
|
||||||
"container_env": "env1",
|
|
||||||
"container_app": overrideApp,
|
|
||||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
|
||||||
}, meta)
|
|
||||||
|
|
||||||
t.Log("Can override just 'task'")
|
newSyslog, err := addContainterMetaToSyslog(syslog)
|
||||||
overrideTask := "force-task"
|
if tcase.wantErr {
|
||||||
meta, err = getContainerMeta(programname, "", "", overrideTask)
|
assert.Error(err)
|
||||||
assert.NoError(err)
|
continue
|
||||||
assert.Equal(map[string]string{
|
}
|
||||||
"container_env": "env1",
|
for k, v := range newSyslog {
|
||||||
"container_app": "app2",
|
switch k {
|
||||||
"container_task": overrideTask,
|
case "container_env":
|
||||||
}, meta)
|
assert.Equal(v, tcase.wantContainerEnv)
|
||||||
|
case "container_app":
|
||||||
|
assert.Equal(v, tcase.wantContainerApp)
|
||||||
|
case "container_task":
|
||||||
|
assert.Equal(v, tcase.wantContainerTask)
|
||||||
|
default:
|
||||||
|
assert.Equal(v, tcase.input[k])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
t.Log("Can override all fields")
|
|
||||||
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
|
||||||
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
|
|
||||||
assert.NoError(err)
|
|
||||||
assert.Equal(map[string]string{
|
|
||||||
"container_env": overrideEnv,
|
|
||||||
"container_app": overrideApp,
|
|
||||||
"container_task": overrideTask,
|
|
||||||
}, meta)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExtractKVMeta(t *testing.T) {
|
func TestExtractKVMeta(t *testing.T) {
|
||||||
|
|
|
||||||
22
go.mod
Normal file
22
go.mod
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
module github.com/amazon-kinesis-client-go
|
||||||
|
|
||||||
|
go 1.13
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/Clever/amazon-kinesis-client-go v1.0.0
|
||||||
|
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340
|
||||||
|
github.com/a8m/kinesis-producer v0.2.0
|
||||||
|
github.com/aws/aws-sdk-go v1.35.28 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
|
github.com/golang/protobuf v1.5.2
|
||||||
|
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 // indirect
|
||||||
|
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d // indirect
|
||||||
|
github.com/stretchr/testify v1.7.0
|
||||||
|
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
|
||||||
|
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 // indirect
|
||||||
|
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||||
|
gopkg.in/Clever/kayvee-go.v6 v6.24.1
|
||||||
|
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
|
||||||
|
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
|
||||||
|
)
|
||||||
63
go.sum
Normal file
63
go.sum
Normal file
|
|
@ -0,0 +1,63 @@
|
||||||
|
github.com/Clever/amazon-kinesis-client-go v1.0.0 h1:rG+hkHCSe+xMGk3asESg18tiEj8X6To8vj0TRa1pKvQ=
|
||||||
|
github.com/Clever/amazon-kinesis-client-go v1.0.0/go.mod h1:jLfi8QusUjdsYazmEH0DlSO4gE1WRzVE7cs4c/fOFdI=
|
||||||
|
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340 h1:wr8lTqPJZQpZkZgllQtrD96SecXkAu5MxzY3yJgsuCg=
|
||||||
|
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340/go.mod h1:e7Yy7RTiIMU9pZ+dcSviX3cpod8e0CEeTUPTgBbKlRE=
|
||||||
|
github.com/a8m/kinesis-producer v0.2.0 h1:Bd5Oi4dczbTLPIZwVbm02464LIFgBqmViFj//b098xc=
|
||||||
|
github.com/a8m/kinesis-producer v0.2.0/go.mod h1:CxoFe0Y49udKMnQPkC5S5VmZZy6a+Bef9otuoH96Pv0=
|
||||||
|
github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc=
|
||||||
|
github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||||
|
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 h1:FRDp/rRU2sThfssdHIUATCKJOms8R6A9rrhdC1sLqFI=
|
||||||
|
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7/go.mod h1:mQyv/QAgjs9+PTi/iXveno+U86nKGsltjqf3ilYx4Bg=
|
||||||
|
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
|
||||||
|
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||||
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
|
||||||
|
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||||
|
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d h1:ix3WmphUvN0GDd0DO9MH0v6/5xTv+Xm1bPN+1UJn58k=
|
||||||
|
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
|
||||||
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
|
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||||
|
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
|
||||||
|
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||||
|
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
|
||||||
|
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
|
||||||
|
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 h1:tdnG+ZILOafvA29+pYKP3gauEbigHll3PHsf1GQa2ms=
|
||||||
|
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
|
||||||
|
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
|
||||||
|
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||||
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
gopkg.in/Clever/kayvee-go.v6 v6.24.1 h1:lHxjlTFj57mfrq06PiCKIAJ0QuHHEqsyvSYvzb+gSFg=
|
||||||
|
gopkg.in/Clever/kayvee-go.v6 v6.24.1/go.mod h1:G0m6nBZj7Kdz+w2hiIaawmhXl5zp7E/K0ashol3Kb2A=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca h1:oivFrl3Vo+KfpUmTDJvz91I+BWzDPOQ+0CNR5jwTHcg=
|
||||||
|
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
|
||||||
|
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
|
||||||
73
golang.mk
73
golang.mk
|
|
@ -1,15 +1,18 @@
|
||||||
# This is the default Clever Golang Makefile.
|
# This is the default Clever Golang Makefile.
|
||||||
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
|
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
|
||||||
# Please do not alter this file directly.
|
# Please do not alter this file directly.
|
||||||
GOLANG_MK_VERSION := 0.4.0
|
GOLANG_MK_VERSION := 1.0.0
|
||||||
|
|
||||||
SHELL := /bin/bash
|
SHELL := /bin/bash
|
||||||
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
|
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
|
||||||
.PHONY: golang-test-deps bin/dep golang-ensure-curl-installed
|
.PHONY: golang-test-deps golang-ensure-curl-installed
|
||||||
|
|
||||||
# set timezone to UTC for golang to match circle and deploys
|
# set timezone to UTC for golang to match circle and deploys
|
||||||
export TZ=UTC
|
export TZ=UTC
|
||||||
|
|
||||||
|
# go build flags for use across all commands which accept them
|
||||||
|
GO_BUILD_FLAGS := "-mod=vendor"
|
||||||
|
|
||||||
# if the gopath includes several directories, use only the first
|
# if the gopath includes several directories, use only the first
|
||||||
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
|
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
|
||||||
|
|
||||||
|
|
@ -32,57 +35,21 @@ _ := $(if \
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
|
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
|
||||||
|
# We pin its version since its a simple tool that does its job as-is;
|
||||||
|
# so we're defended against it breaking or changing in the future.
|
||||||
FGT := $(GOPATH)/bin/fgt
|
FGT := $(GOPATH)/bin/fgt
|
||||||
$(FGT):
|
$(FGT):
|
||||||
go get github.com/GeertJohan/fgt
|
go get github.com/GeertJohan/fgt@262f7b11eec07dc7b147c44641236f3212fee89d
|
||||||
|
|
||||||
golang-ensure-curl-installed:
|
golang-ensure-curl-installed:
|
||||||
@command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; }
|
@command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; }
|
||||||
|
|
||||||
DEP_VERSION = v0.4.1
|
|
||||||
DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]')
|
|
||||||
# Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but
|
|
||||||
# not yet the official tool for Golang.
|
|
||||||
ifeq ($(DEP_VERSION),$(DEP_INSTALLED))
|
|
||||||
bin/dep: # nothing to do, dep is already up-to-date
|
|
||||||
else
|
|
||||||
CACHED_DEP = /tmp/dep-$(DEP_VERSION)
|
|
||||||
bin/dep: golang-ensure-curl-installed
|
|
||||||
@echo "Updating dep..."
|
|
||||||
@mkdir -p bin
|
|
||||||
@if [ ! -f $(CACHED_DEP) ]; then curl -o $(CACHED_DEP) -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64; fi;
|
|
||||||
@cp $(CACHED_DEP) bin/dep
|
|
||||||
@chmod +x bin/dep || true
|
|
||||||
endif
|
|
||||||
|
|
||||||
# figure out "github.com/<org>/<repo>"
|
|
||||||
# `go list` will fail if there are no .go files in the directory
|
|
||||||
# if this is the case, fall back to assuming github.com/Clever
|
|
||||||
REF = $(shell go list || echo github.com/Clever/$(notdir $(shell pwd)))
|
|
||||||
golang-verify-no-self-references:
|
|
||||||
@if grep -q -i "$(REF)" Gopkg.lock; then echo "Error: Gopkg.lock includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
|
|
||||||
@if grep -q -i "$(REF)" Gopkg.toml; then echo "Error: Gopkg.toml includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
|
|
||||||
|
|
||||||
golang-dep-vendor-deps: bin/dep golang-verify-no-self-references
|
|
||||||
|
|
||||||
# golang-godep-vendor is a target for saving dependencies with the dep tool
|
|
||||||
# to the vendor/ directory. All nested vendor/ directories are deleted via
|
|
||||||
# the prune command.
|
|
||||||
# In CI, -vendor-only is used to avoid updating the lock file.
|
|
||||||
ifndef CI
|
|
||||||
define golang-dep-vendor
|
|
||||||
bin/dep ensure -v
|
|
||||||
endef
|
|
||||||
else
|
|
||||||
define golang-dep-vendor
|
|
||||||
bin/dep ensure -v -vendor-only
|
|
||||||
endef
|
|
||||||
endif
|
|
||||||
|
|
||||||
# Golint is a tool for linting Golang code for common errors.
|
# Golint is a tool for linting Golang code for common errors.
|
||||||
|
# We pin its version because an update could add a new lint check which would make
|
||||||
|
# previously passing tests start failing without changing our code.
|
||||||
GOLINT := $(GOPATH)/bin/golint
|
GOLINT := $(GOPATH)/bin/golint
|
||||||
$(GOLINT):
|
$(GOLINT):
|
||||||
go get golang.org/x/lint/golint
|
go get golang.org/x/lint/golint@738671d3881b9731cc63024d5d88cf28db875626
|
||||||
|
|
||||||
# golang-fmt-deps requires the FGT tool for checking output
|
# golang-fmt-deps requires the FGT tool for checking output
|
||||||
golang-fmt-deps: $(FGT)
|
golang-fmt-deps: $(FGT)
|
||||||
|
|
@ -91,7 +58,7 @@ golang-fmt-deps: $(FGT)
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-fmt
|
define golang-fmt
|
||||||
@echo "FORMATTING $(1)..."
|
@echo "FORMATTING $(1)..."
|
||||||
@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go
|
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); $(FGT) gofmt -l=true $${PKG_PATH}/*.go
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-lint-deps requires the golint tool for golang linting.
|
# golang-lint-deps requires the golint tool for golang linting.
|
||||||
|
|
@ -101,7 +68,7 @@ golang-lint-deps: $(GOLINT)
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-lint
|
define golang-lint
|
||||||
@echo "LINTING $(1)..."
|
@echo "LINTING $(1)..."
|
||||||
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT)
|
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(GOLINT)
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-lint-deps-strict requires the golint tool for golang linting.
|
# golang-lint-deps-strict requires the golint tool for golang linting.
|
||||||
|
|
@ -112,7 +79,7 @@ golang-lint-deps-strict: $(GOLINT) $(FGT)
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-lint-strict
|
define golang-lint-strict
|
||||||
@echo "LINTING $(1)..."
|
@echo "LINTING $(1)..."
|
||||||
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
|
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-test-deps is here for consistency
|
# golang-test-deps is here for consistency
|
||||||
|
|
@ -122,7 +89,7 @@ golang-test-deps:
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-test
|
define golang-test
|
||||||
@echo "TESTING $(1)..."
|
@echo "TESTING $(1)..."
|
||||||
@go test -v $(1)
|
@go test $(GO_BUILD_FLAGS) -v $(1)
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-test-strict-deps is here for consistency
|
# golang-test-strict-deps is here for consistency
|
||||||
|
|
@ -132,7 +99,7 @@ golang-test-strict-deps:
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-test-strict
|
define golang-test-strict
|
||||||
@echo "TESTING $(1)..."
|
@echo "TESTING $(1)..."
|
||||||
@go test -v -race $(1)
|
@go test -v $(GO_BUILD_FLAGS) -race $(1)
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-vet-deps is here for consistency
|
# golang-vet-deps is here for consistency
|
||||||
|
|
@ -142,7 +109,7 @@ golang-vet-deps:
|
||||||
# arg1: pkg path
|
# arg1: pkg path
|
||||||
define golang-vet
|
define golang-vet
|
||||||
@echo "VETTING $(1)..."
|
@echo "VETTING $(1)..."
|
||||||
@go vet $(GOPATH)/src/$(1)/*.go
|
@go vet $(GO_BUILD_FLAGS) $(1)
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-test-all-deps installs all dependencies needed for different test cases.
|
# golang-test-all-deps installs all dependencies needed for different test cases.
|
||||||
|
|
@ -176,14 +143,14 @@ endef
|
||||||
define golang-build
|
define golang-build
|
||||||
@echo "BUILDING..."
|
@echo "BUILDING..."
|
||||||
@if [ -z "$$CI" ]; then \
|
@if [ -z "$$CI" ]; then \
|
||||||
go build -o bin/$(2) $(1); \
|
go build $(GO_BUILD_FLAGS) -o bin/$(2) $(1); \
|
||||||
else \
|
else \
|
||||||
echo "-> Building CGO binary"; \
|
echo "-> Building CGO binary"; \
|
||||||
CGO_ENABLED=0 go build -installsuffix cgo -o bin/$(2) $(1); \
|
CGO_ENABLED=0 go build $(GO_BUILD_FLAGS) -installsuffix cgo -o bin/$(2) $(1); \
|
||||||
fi;
|
fi;
|
||||||
endef
|
endef
|
||||||
|
|
||||||
# golang-update-makefile downloads latest version of golang.mk
|
# golang-update-makefile downloads latest version of golang.mk
|
||||||
golang-update-makefile:
|
golang-update-makefile:
|
||||||
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null
|
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang-v1.mk -O /tmp/golang.mk 2>/dev/null
|
||||||
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi
|
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,150 @@
|
||||||
|
// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream.
|
||||||
|
// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles,
|
||||||
|
// as well as apply appropriate decompression.
|
||||||
|
// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as
|
||||||
|
// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer.
|
||||||
|
// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use
|
||||||
|
// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves.
|
||||||
|
// - Deaggregate if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary
|
||||||
|
// in addition to the KPL splitting.
|
||||||
package splitter
|
package splitter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"compress/zlib"
|
||||||
|
"crypto/md5"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
kpl "github.com/a8m/kinesis-producer"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single
|
||||||
|
// Amazon Kinesis record for efficient puts.
|
||||||
|
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
|
||||||
|
var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
|
||||||
|
|
||||||
|
// IsKPLAggregate checks a record for a KPL aggregate prefix.
|
||||||
|
// It is not necessary to call this before calling KPLDeaggregate.
|
||||||
|
func IsKPLAggregate(data []byte) bool {
|
||||||
|
return bytes.HasPrefix(data, kplMagicNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation.
|
||||||
|
// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records.
|
||||||
|
// Otherwise, the return value is a singleton containing the original record.
|
||||||
|
func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) {
|
||||||
|
if !IsKPLAggregate(kinesisRecord) {
|
||||||
|
return [][]byte{kinesisRecord}, nil
|
||||||
|
}
|
||||||
|
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
|
||||||
|
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
|
||||||
|
recordSum := md5.Sum(src)
|
||||||
|
for i, b := range checksum {
|
||||||
|
if b != recordSum[i] {
|
||||||
|
// either the data is corrupted or this is not a KPL aggregate
|
||||||
|
// either way, return the data as-is
|
||||||
|
return [][]byte{kinesisRecord}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dest := new(kpl.AggregatedRecord)
|
||||||
|
err := proto.Unmarshal(src, dest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshalling proto: %v", err)
|
||||||
|
}
|
||||||
|
var records [][]byte
|
||||||
|
for _, userRecord := range dest.GetRecords() {
|
||||||
|
records = append(records, userRecord.Data)
|
||||||
|
}
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deaggregate is a combination of KPLDeaggregate and SplitMessageIfNecessary
|
||||||
|
// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record.
|
||||||
|
// If successful, it iterates over the individual user records and attempts to unzlib them.
|
||||||
|
// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version.
|
||||||
|
// If it is not zlibbed, the output will contain the record verbatim
|
||||||
|
// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary.
|
||||||
|
// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header.
|
||||||
|
// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib.
|
||||||
|
//
|
||||||
|
// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles.
|
||||||
|
func Deaggregate(kinesisRecord []byte) ([][]byte, error) {
|
||||||
|
if !IsKPLAggregate(kinesisRecord) {
|
||||||
|
return SplitMessageIfNecessary(kinesisRecord)
|
||||||
|
}
|
||||||
|
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
|
||||||
|
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
|
||||||
|
recordSum := md5.Sum(src)
|
||||||
|
for i, b := range checksum {
|
||||||
|
if b != recordSum[i] {
|
||||||
|
// either the data is corrupted or this is not a KPL aggregate
|
||||||
|
// either way, return the data as-is
|
||||||
|
return [][]byte{kinesisRecord}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dest := new(kpl.AggregatedRecord)
|
||||||
|
err := proto.Unmarshal(src, dest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshalling proto: %v", err)
|
||||||
|
}
|
||||||
|
var records [][]byte
|
||||||
|
for _, userRecord := range dest.GetRecords() {
|
||||||
|
record, err := unzlib(userRecord.Data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unzlibbing record: %w", err)
|
||||||
|
}
|
||||||
|
records = append(records, record)
|
||||||
|
}
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records.
|
||||||
|
// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this.
|
||||||
|
// This function handles three types of records:
|
||||||
|
// - records emitted from CWLogs Subscription (which are gzip compressed)
|
||||||
|
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit
|
||||||
|
// - any other record (left unchanged)
|
||||||
|
//
|
||||||
|
// CloudWatch logs come as structured JSON. In the process of splitting, they are converted
|
||||||
|
// into an rsyslog format that allows fairly uniform parsing of the result across the
|
||||||
|
// AWS services that might emit logs to CloudWatch.
|
||||||
|
// Note that these timezone used in these syslog records is guessed based on the local env.
|
||||||
|
// If you need consistent timezones, set TZ=UTC in your environment.
|
||||||
|
func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) {
|
||||||
|
// First try the record as a CWLogs record
|
||||||
|
if IsGzipped(userRecord) {
|
||||||
|
return GetMessagesFromGzippedInput(userRecord)
|
||||||
|
}
|
||||||
|
|
||||||
|
unzlibRecord, err := unzlib(userRecord)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process a single message, from KPL
|
||||||
|
return [][]byte{unzlibRecord}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func unzlib(input []byte) ([]byte, error) {
|
||||||
|
zlibReader, err := zlib.NewReader(bytes.NewReader(input))
|
||||||
|
if err == nil {
|
||||||
|
unzlibRecord, err := ioutil.ReadAll(zlibReader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("reading zlib-compressed record: %v", err)
|
||||||
|
}
|
||||||
|
return unzlibRecord, nil
|
||||||
|
}
|
||||||
|
return input, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// LogEvent is a single log line within a LogEventBatch
|
// LogEvent is a single log line within a LogEventBatch
|
||||||
type LogEvent struct {
|
type LogEvent struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
|
|
@ -116,6 +250,11 @@ var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0
|
||||||
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
|
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
|
||||||
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
|
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
|
||||||
|
|
||||||
|
// glue log groups are of the form /aws-glue/jobs/<env>/<app>/<pod-id>
|
||||||
|
// glue log streams are of the form <job id>-<"driver" | "1" | "progress-bar">
|
||||||
|
var awsGlueLogGroupRegex = regexp.MustCompile(`^/aws-glue/jobs/([a-z0-9-]+)/([a-z0-9-]+)/([a-z0-9-]+)$`)
|
||||||
|
var awsGlueLogStreamRegex = regexp.MustCompile(`^(jr_[a-z0-9-]+)-.*$`)
|
||||||
|
|
||||||
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
||||||
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
||||||
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
||||||
|
|
@ -228,6 +367,32 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
return out, true
|
return out, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func splitAWSGlue(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||||
|
matches := awsGlueLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||||
|
if len(matches) != 1 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
env := matches[0][1]
|
||||||
|
app := matches[0][2]
|
||||||
|
|
||||||
|
streamMatches := awsGlueLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
|
||||||
|
if len(streamMatches) != 1 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
jobID := streamMatches[0][1]
|
||||||
|
|
||||||
|
out := []RSysLogMessage{}
|
||||||
|
for _, event := range b.LogEvents {
|
||||||
|
out = append(out, RSysLogMessage{
|
||||||
|
Timestamp: event.Timestamp.Time(),
|
||||||
|
ProgramName: env + "--" + app + arnCruft + jobID,
|
||||||
|
Hostname: "aws-glue",
|
||||||
|
Message: event.Message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||||
out := []RSysLogMessage{}
|
out := []RSysLogMessage{}
|
||||||
for _, event := range b.LogEvents {
|
for _, event := range b.LogEvents {
|
||||||
|
|
@ -261,6 +426,8 @@ func Split(b LogEventBatch) [][]byte {
|
||||||
return stringify(rsyslogMsgs)
|
return stringify(rsyslogMsgs)
|
||||||
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
|
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
|
||||||
return stringify(rsyslogMsgs)
|
return stringify(rsyslogMsgs)
|
||||||
|
} else if rsyslogMsgs, ok := splitAWSGlue(b); ok {
|
||||||
|
return stringify(rsyslogMsgs)
|
||||||
}
|
}
|
||||||
return stringify(splitDefault(b))
|
return stringify(splitDefault(b))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,15 +3,29 @@ package splitter
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"compress/zlib"
|
||||||
|
"crypto/md5"
|
||||||
b64 "encoding/base64"
|
b64 "encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Clever/amazon-kinesis-client-go/decode"
|
"github.com/Clever/amazon-kinesis-client-go/decode"
|
||||||
|
kpl "github.com/a8m/kinesis-producer"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
// In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string,
|
||||||
|
// the timezone used in the final string depends on the locally set timezone.
|
||||||
|
// in order for tests to pass, we set TZ to UTC
|
||||||
|
os.Setenv("TZ", "UTC")
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnpacking(t *testing.T) {
|
func TestUnpacking(t *testing.T) {
|
||||||
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
|
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
|
||||||
|
|
||||||
|
|
@ -253,3 +267,290 @@ func TestSplitRDS(t *testing.T) {
|
||||||
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
|
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitGlue(t *testing.T) {
|
||||||
|
input := LogEventBatch{
|
||||||
|
MessageType: "DATA_MESSAGE",
|
||||||
|
Owner: "123456789012",
|
||||||
|
LogGroup: "/aws-glue/jobs/clever-dev/analytics-district-participation/aae75f00",
|
||||||
|
LogStream: "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28-1",
|
||||||
|
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
|
||||||
|
LogEvents: []LogEvent{
|
||||||
|
{
|
||||||
|
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||||
|
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||||
|
Message: "foo bar.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
lines := Split(input)
|
||||||
|
expected := [][]byte{
|
||||||
|
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-glue clever-dev--analytics-district-participation/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2Fjr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28: foo bar.`),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expected, lines)
|
||||||
|
for _, line := range expected {
|
||||||
|
enhanced, err := decode.ParseAndEnhance(string(line), "")
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, "aws-glue", enhanced["hostname"])
|
||||||
|
assert.Equal(t, "clever-dev", enhanced["container_env"])
|
||||||
|
assert.Equal(t, "analytics-district-participation", enhanced["container_app"])
|
||||||
|
assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitIfNecesary(t *testing.T) {
|
||||||
|
|
||||||
|
// We provide three different inputs to batchedWriter.splitMessageIfNecessary
|
||||||
|
// plain text
|
||||||
|
// zlib compressed text
|
||||||
|
// gzip compressed CloudWatch logs batch
|
||||||
|
// we verify that the split function matches the input against the correct splitter
|
||||||
|
// and decodes it.
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
plainTextInput := []byte("hello, world!")
|
||||||
|
|
||||||
|
records, err := SplitMessageIfNecessary(plainTextInput)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(
|
||||||
|
records,
|
||||||
|
[][]byte{[]byte("hello, world!")},
|
||||||
|
)
|
||||||
|
|
||||||
|
var z bytes.Buffer
|
||||||
|
zbuf := zlib.NewWriter(&z)
|
||||||
|
zbuf.Write([]byte("hello, world!"))
|
||||||
|
zbuf.Close()
|
||||||
|
zlibSingleInput := z.Bytes()
|
||||||
|
|
||||||
|
records, err = SplitMessageIfNecessary(zlibSingleInput)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(
|
||||||
|
records,
|
||||||
|
[][]byte{[]byte("hello, world!")},
|
||||||
|
)
|
||||||
|
|
||||||
|
// the details of this part aren't super important since the actual functionality is
|
||||||
|
// tested in other tests; for this test we just want to make sure that split function
|
||||||
|
// correctly realizes it's gzip and call the appropriate CW-log-splitting logic
|
||||||
|
var g bytes.Buffer
|
||||||
|
gbuf := gzip.NewWriter(&g)
|
||||||
|
cwLogBatch := LogEventBatch{
|
||||||
|
MessageType: "test",
|
||||||
|
Owner: "test",
|
||||||
|
LogGroup: "test",
|
||||||
|
LogStream: "test",
|
||||||
|
SubscriptionFilters: []string{""},
|
||||||
|
LogEvents: []LogEvent{{
|
||||||
|
ID: "test",
|
||||||
|
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
|
||||||
|
Message: "test",
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
|
||||||
|
gbuf.Write(cwLogBatchJSON)
|
||||||
|
gbuf.Close()
|
||||||
|
gzipBatchInput := g.Bytes()
|
||||||
|
|
||||||
|
expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")
|
||||||
|
records, err = SplitMessageIfNecessary(gzipBatchInput)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(
|
||||||
|
records,
|
||||||
|
[][]byte{expectedRecord},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createKPLAggregate(input [][]byte, compress bool) []byte {
|
||||||
|
var partitionKeyIndex uint64 = 0
|
||||||
|
|
||||||
|
records := []*kpl.Record{}
|
||||||
|
for _, log := range input {
|
||||||
|
if compress {
|
||||||
|
var z bytes.Buffer
|
||||||
|
zbuf := zlib.NewWriter(&z)
|
||||||
|
zbuf.Write(log)
|
||||||
|
zbuf.Close()
|
||||||
|
log = z.Bytes()
|
||||||
|
}
|
||||||
|
records = append(records, &kpl.Record{
|
||||||
|
PartitionKeyIndex: &partitionKeyIndex,
|
||||||
|
Data: log,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
logProto, err := proto.Marshal(&kpl.AggregatedRecord{
|
||||||
|
PartitionKeyTable: []string{"ecs_task_arn"},
|
||||||
|
Records: records,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
log := append(kplMagicNumber, logProto...)
|
||||||
|
logHash := md5.Sum(logProto)
|
||||||
|
return append(log, logHash[0:16]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKPLDeaggregate(t *testing.T) {
|
||||||
|
type test struct {
|
||||||
|
description string
|
||||||
|
input []byte
|
||||||
|
output [][]byte
|
||||||
|
shouldError bool
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []test{
|
||||||
|
{
|
||||||
|
description: "non-aggregated record",
|
||||||
|
input: []byte("hello"),
|
||||||
|
output: [][]byte{[]byte("hello")},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "one kpl-aggregated record",
|
||||||
|
input: createKPLAggregate(
|
||||||
|
[][]byte{[]byte("hello")},
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
output: [][]byte{[]byte("hello")},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "three kpl-aggregated record",
|
||||||
|
input: createKPLAggregate([][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
output: [][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.description, func(t *testing.T) {
|
||||||
|
out, err := KPLDeaggregate(tt.input)
|
||||||
|
if tt.shouldError {
|
||||||
|
assert.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, out, tt.output)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeaggregate(t *testing.T) {
|
||||||
|
type test struct {
|
||||||
|
description string
|
||||||
|
input []byte
|
||||||
|
output [][]byte
|
||||||
|
shouldError bool
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []test{
|
||||||
|
{
|
||||||
|
description: "non-aggregated record",
|
||||||
|
input: []byte("hello"),
|
||||||
|
output: [][]byte{[]byte("hello")},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "one kpl-aggregated record",
|
||||||
|
input: createKPLAggregate(
|
||||||
|
[][]byte{[]byte("hello")},
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
output: [][]byte{[]byte("hello")},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "three kpl-aggregated record",
|
||||||
|
input: createKPLAggregate([][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
output: [][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "one kpl-aggregated zlib record",
|
||||||
|
input: createKPLAggregate(
|
||||||
|
[][]byte{[]byte("hello")},
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
output: [][]byte{[]byte("hello")},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "three kpl-aggregated zlib record",
|
||||||
|
input: createKPLAggregate([][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
output: [][]byte{
|
||||||
|
[]byte("hello, "),
|
||||||
|
[]byte("world"),
|
||||||
|
[]byte("!"),
|
||||||
|
},
|
||||||
|
shouldError: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var g bytes.Buffer
|
||||||
|
gbuf := gzip.NewWriter(&g)
|
||||||
|
cwLogBatch := LogEventBatch{
|
||||||
|
MessageType: "test",
|
||||||
|
Owner: "test",
|
||||||
|
LogGroup: "test",
|
||||||
|
LogStream: "test",
|
||||||
|
SubscriptionFilters: []string{""},
|
||||||
|
LogEvents: []LogEvent{{
|
||||||
|
ID: "test",
|
||||||
|
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
|
||||||
|
Message: "test",
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
|
||||||
|
gbuf.Write(cwLogBatchJSON)
|
||||||
|
gbuf.Close()
|
||||||
|
gzipBatchInput := g.Bytes()
|
||||||
|
|
||||||
|
tests = append(tests, test{
|
||||||
|
description: "cloudwatch log batch",
|
||||||
|
input: gzipBatchInput,
|
||||||
|
output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")},
|
||||||
|
shouldError: false,
|
||||||
|
})
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.description, func(t *testing.T) {
|
||||||
|
out, err := Deaggregate(tt.input)
|
||||||
|
|
||||||
|
if tt.shouldError {
|
||||||
|
assert.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, out, tt.output)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue