Compare commits

...

51 commits

Author SHA1 Message Date
Taylor Sutton
f78d1c6b9b
Merge pull request #71 from Clever/INFRANG-6435
notify CI failure
2024-09-19 07:23:19 -07:00
Tanmay Sardesai
a8a4e207fe notify CI failure 2024-09-18 13:04:48 -04:00
Taylor Sutton
e66f1922b7
Merge pull request #68 from Clever/INFRANG-5293
Update github known hosts key
2023-05-04 09:43:18 -07:00
Chris Martin
dc3e5e097c Update github known hosts key 2023-04-25 14:56:16 -04:00
Nikhil Bhatia
9a1371d965
Merge pull request #67 from Clever/ci-status-notify
Microplane: add Github Action workflow for ci-notify
2022-03-29 20:30:42 -07:00
nbhatia823
cf19a05ced Microplane: add Github Action workflow for ci-notify 2022-03-29 15:12:09 -07:00
Rafael
3a42d32e53
Merge pull request #66 from Clever/go-modules
dep -> go modules
2021-06-03 19:12:19 -04:00
Gavi Hirsch
d461ebebb6 dep -> go modules 2021-06-03 15:41:20 -07:00
Taylor Sutton
c11acd24ec
Merge pull request #64 from Clever/dependabot/dep/github.com/golang/protobuf-1.5.2
Bump github.com/golang/protobuf from 1.5.1 to 1.5.2
2021-03-30 12:57:35 -07:00
dependabot-preview[bot]
ad3e33c7d6
Bump github.com/golang/protobuf from 1.5.1 to 1.5.2
Bumps [github.com/golang/protobuf](https://github.com/golang/protobuf) from 1.5.1 to 1.5.2.
- [Release notes](https://github.com/golang/protobuf/releases)
- [Commits](https://github.com/golang/protobuf/compare/v1.5.1...v1.5.2)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2021-03-30 17:52:57 +00:00
Taylor Sutton
fa54b6ee39
Merge pull request #63 from Clever/dependabot/dep/github.com/golang/protobuf-1.5.1
Bump github.com/golang/protobuf from 1.4.3 to 1.5.1
2021-03-18 11:22:53 -07:00
dependabot-preview[bot]
c164f1d895
Bump github.com/golang/protobuf from 1.4.3 to 1.5.1
Bumps [github.com/golang/protobuf](https://github.com/golang/protobuf) from 1.4.3 to 1.5.1.
- [Release notes](https://github.com/golang/protobuf/releases)
- [Commits](https://github.com/golang/protobuf/compare/v1.4.3...v1.5.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2021-03-18 17:21:17 +00:00
Gavi Hirsch
6ac018712d
Merge pull request #62 from Clever/INFRANG-4003
add versioning
2021-03-05 09:27:29 -08:00
Gavi Hirsch
7351a3d3f7 add versioning 2021-03-04 10:11:51 -08:00
Rafael
358de6589c
Merge pull request #61 from Clever/dependabot/dep/gopkg.in/Clever/kayvee-go.v6-6.24.1
Bump gopkg.in/Clever/kayvee-go.v6 from 6.24.0 to 6.24.1
2021-02-02 13:26:14 -05:00
dependabot-preview[bot]
fe41fdaac3
Bump gopkg.in/Clever/kayvee-go.v6 from 6.24.0 to 6.24.1
Bumps [gopkg.in/Clever/kayvee-go.v6](https://github.com/Clever/kayvee-go) from 6.24.0 to 6.24.1.
- [Release notes](https://github.com/Clever/kayvee-go/releases)
- [Commits](https://github.com/Clever/kayvee-go/compare/v6.24.0...v6.24.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2021-02-02 18:20:42 +00:00
Tanmay Sardesai
102de58470
Merge pull request #60 from Clever/dependabot/dep/github.com/stretchr/testify-1.7.0
Bump github.com/stretchr/testify from 1.6.1 to 1.7.0
2021-01-19 10:01:36 -08:00
dependabot-preview[bot]
54f91f4073
Bump github.com/stretchr/testify from 1.6.1 to 1.7.0
Bumps [github.com/stretchr/testify](https://github.com/stretchr/testify) from 1.6.1 to 1.7.0.
- [Release notes](https://github.com/stretchr/testify/releases)
- [Commits](https://github.com/stretchr/testify/compare/v1.6.1...v1.7.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2021-01-13 18:18:37 +00:00
Taylor Sutton
29f86e9986
Merge pull request #59 from Clever/refactor-decompression
Refactor decompression
2020-11-16 20:18:07 -05:00
Taylor Sutton
42c3480288 Simplify main function's name. 2020-11-16 17:37:53 -05:00
Taylor Sutton
cc4f7716b3 More documentation around handling of CW log bundles; deflake test 2020-11-16 12:44:50 -05:00
Taylor Sutton
6e8a99c50d Add functionality for handling KPL aggregates and zlib
Adds a lowest-common-denominator function, KPLDeaggregate, for
handling records that might be KPL aggregated. Also adds a function,
DeaggregateAndSplitIfNecessary, to wrap the existing functionality of
SplitMessageIfNecessary with KPL deaggreation.

These functions are handy for non-KCL consumers, like Lambda
functions. KCL automatically applies deaggreation for you.

This change is backwards compatible - the previously exposed function
SplitMessageIfNecessary still does the same things.
2020-11-16 11:01:28 -05:00
Taylor Sutton
4dd769ffca
Merge pull request #58 from Clever/refactor-decompression
Refactor zlib decompression into the splitter package
2020-11-12 11:57:53 -05:00
Taylor Sutton
f8e9c34641 Refactor zlib decompression into the splitter package
Being in the batchconsumer package means it will work for anything
using KCL, but lambdas that subscribe to these log streams do not use
batchconsumer at all; instead they invoke the splitter package
directly. As such, if we want this functionality to be available to
lambda log consumers, it can't be in batchconsumer.

There are no functionality changes here, just moving code from an
unexported method in one place to an exported function in another
place. The tests also get moved along with it.
2020-11-12 11:37:13 -05:00
Taylor Sutton
2a7e96157e
Merge pull request #57 from Clever/add-zlib-decompression
Add zlib decompression to batch consumer.
2020-11-11 12:42:11 -05:00
Taylor Sutton
4fe27d0d39 Clarifying comment - Logs from CW Logs are gzipped. 2020-11-11 12:39:51 -05:00
Taylor Sutton
6b2d1f8a56 Add zlib decompression to batch consumer.
This has become relevant as zlib is the compression method supported
by the Kinesis plugin for Fluent Bit.
2020-11-11 09:12:05 -05:00
Gavi Hirsch
611f0c0a60
Merge pull request #56 from Clever/INFRANG-3918
clear count stats after logging them
2020-09-11 15:02:06 -07:00
Gavi Hirsch
1112894639
clear count stats after logging them 2020-09-11 14:04:42 -07:00
Taylor Sutton
484b54bfe0
Merge pull request #55 from Clever/INFRANG-35-fluent-decode
Add decoding of JSON logs in Fluent format.
2020-08-18 08:44:17 -07:00
Taylor Sutton
f5ce6fe4e7 Add decoding of JSON logs in Fluent format.
ParseAndEnhance used to be:
- Try to parse line as a syslog, extracting the log itself and other
  fields from syslog format
  - If that succeeds, try to parse the log as either a Kayvee log or
    an RDS slow query log.
  - Combine all these fields, and add on some "derived"
    fields (container_task|env|app).
- Not a syslog => error

Now it will be:
- Try to parse line as a syslog, same as before, including the
  Kayvee/RDS part
- If syslog parsing failed, try to parse as a Fluent log and extract
  some fields from the Fluent format (the log, timestamp, etc)
  - If that succeeds, try to parse the log itself as a Kayvee log.
  - Combine Kayvee fields (if found) and derived fields)
- If BOTH formats fields, it is an error.

The decoding makes a lot of assumptions:
- The names of the log field and timestamp field (even though,
theoretically, they are customizable in the fluentbit config.
- The timestamp format (again)
- The format of the Task Definition name (or at least part of it)
- All fluentbit logs should have hostname set to `aws-fargate`.

Perhaps these can be relaxed if necessary. They could probably be
replaced by some kind of config. As there is currently no config I
wanted to keep things simple as possible. If we need to re-evaluate
(for example if we start getting JSON logs that don't want to use the
same handling for container_task|env|app) we can reevaluate.
2020-08-17 15:02:59 -07:00
Gavi Hirsch
9b32d93d1c
Merge pull request #54 from Clever/upgrade-kayvee-go-6.24.0
upgrade kayvee-go to 6.24.0
2020-08-14 19:19:26 -07:00
Gavi Hirsch
3bedc65483 upgrade kayvee-go to 6.24.0 2020-08-14 16:30:08 -07:00
Gavi Hirsch
b16af062fb
Merge pull request #53 from Clever/glue-2.0
glue log group format
2020-08-11 16:52:03 -07:00
Gavi Hirsch
30d3925119 glue log group format 2020-08-11 16:42:59 -07:00
Gavi Hirsch
ebd1e0e39c
Merge pull request #52 from Clever/dependabot/dep/github.com/stretchr/testify-1.6.1
Bump github.com/stretchr/testify from 1.1.4 to 1.6.1
2020-07-24 12:21:48 -07:00
Gavi Hirsch
bfe1a6ffb9 fix Gopkg.lock 2020-07-24 12:19:30 -07:00
dependabot-preview[bot]
35b07d7aed
Bump github.com/stretchr/testify from 1.1.4 to 1.6.1
Bumps [github.com/stretchr/testify](https://github.com/stretchr/testify) from 1.1.4 to 1.6.1.
- [Release notes](https://github.com/stretchr/testify/releases)
- [Commits](https://github.com/stretchr/testify/compare/v1.1.4...v1.6.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-07-24 17:38:31 +00:00
Gavi Hirsch
8a8b44112d
Merge pull request #51 from Clever/dependabot/dep/gopkg.in/Clever/kayvee-go.v6-6.23.0
Bump gopkg.in/Clever/kayvee-go.v6 from 6.6.0 to 6.23.0
2020-07-23 10:27:58 -07:00
dependabot-preview[bot]
8be774bc09
Bump gopkg.in/Clever/kayvee-go.v6 from 6.6.0 to 6.23.0
Bumps [gopkg.in/Clever/kayvee-go.v6](https://github.com/Clever/kayvee-go) from 6.6.0 to 6.23.0.
- [Release notes](https://github.com/Clever/kayvee-go/releases)
- [Commits](https://github.com/Clever/kayvee-go/compare/v6.6.0...v6.23.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2020-07-23 17:24:47 +00:00
Rafael
504ebfad60
Merge pull request #49 from Clever/INFRANG-3968
support parsing AWS Glue job logs
2020-07-20 14:15:14 -04:00
Rafael Garcia
420ad243a4 add codeowners 2020-07-20 18:11:27 +00:00
Rafael Garcia
337d2063f5 support parsing AWS Glue job logs 2020-07-20 17:20:39 +00:00
Sayan
eb11747434
Merge pull request #48 from Clever/update-jar-link
Fix maven repo link
2020-04-06 13:50:16 -07:00
Sayan Samanta
d489b5039b update to repo1 link for jars 2020-04-06 13:31:10 -07:00
Taylor Sutton
e7db80a35a
Merge pull request #46 from Clever/go1.13
Upgrade go to v1.13
2019-10-18 11:36:05 -07:00
Taylor Sutton
5132214945 Upgrade go to v1.13 2019-10-18 09:45:22 -07:00
Daniel Xu
e265494afb
Merge pull request #45 from Clever/INFRA-3664
Fix regex that parses the RDS slowquery user
2019-10-14 17:21:53 -07:00
Daniel Xu
69d31088f3 Add tests for parsing slow query user 2019-10-14 17:19:12 -07:00
Daniel Xu
7446627fae Fix regex that parses the RDS slowquery user 2019-10-14 17:02:01 -07:00
Aaron Stein
9cbcf1096a
Merge pull request #44 from Clever/parse-rds
Parse rds slowquery user
2019-10-02 10:21:31 -07:00
18 changed files with 1046 additions and 315 deletions

View file

@ -3,9 +3,10 @@ jobs:
build:
working_directory: /go/src/github.com/Clever/amazon-kinesis-client-go
docker:
- image: circleci/golang:1.12-stretch
- image: circleci/golang:1.13-stretch
- image: circleci/mongo:3.2.20-jessie-ram
environment:
GOPRIVATE: github.com/Clever/*
CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
steps:
@ -17,7 +18,13 @@ jobs:
- run:
command: mkdir -p $CIRCLE_ARTIFACTS $CIRCLE_TEST_REPORTS
name: Set up CircleCI artifacts directories
- run:
command: git config --global "url.ssh://git@github.com/Clever".insteadOf "https://github.com/Clever"
- run:
name: Add github.com to known hosts
command: mkdir -p ~/.ssh && touch ~/.ssh/known_hosts && echo 'github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=' >> ~/.ssh/known_hosts
- run: make install_deps
- run: make build
- run: make bench
- run: make test
- run: if [ "${CIRCLE_BRANCH}" == "master" ]; then $HOME/ci-scripts/circleci/github-release $GH_RELEASE_TOKEN; fi;

1
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1 @@
* @Clever/eng-infra

20
.github/workflows/notify-ci-status.yml vendored Normal file
View file

@ -0,0 +1,20 @@
name: Notify CI status
on:
check_suite:
types: [completed]
status:
jobs:
call-workflow:
if: >-
(github.event.branches[0].name == github.event.repository.default_branch &&
(github.event.state == 'error' || github.event.state == 'failure')) ||
(github.event.check_suite.head_branch == github.event.repository.default_branch &&
github.event.check_suite.conclusion != 'success')
uses: Clever/ci-scripts/.github/workflows/reusable-notify-ci-status.yml@master
secrets:
CIRCLE_CI_INTEGRATIONS_URL: ${{ secrets.CIRCLE_CI_INTEGRATIONS_URL }}
CIRCLE_CI_INTEGRATIONS_USERNAME: ${{ secrets.CIRCLE_CI_INTEGRATIONS_USERNAME }}
CIRCLE_CI_INTEGRATIONS_PASSWORD: ${{ secrets.CIRCLE_CI_INTEGRATIONS_PASSWORD }}
SLACK_BOT_TOKEN: ${{ secrets.DAPPLE_BOT_TOKEN }}

84
Gopkg.lock generated
View file

@ -1,84 +0,0 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
branch = "master"
name = "github.com/Clever/syslogparser"
packages = [
".",
"rfc3164"
]
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
[[projects]]
name = "github.com/davecgh/go-spew"
packages = ["spew"]
revision = "6d212800a42e8ab5c146b8ace3490ee17e5225f9"
[[projects]]
branch = "master"
name = "github.com/jeromer/syslogparser"
packages = ["."]
revision = "0e4ae46ea3f08de351074b643d649d5d00661a3c"
[[projects]]
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
revision = "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
[[projects]]
name = "github.com/stretchr/testify"
packages = [
"assert",
"require"
]
revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0"
version = "v1.1.4"
[[projects]]
name = "github.com/xeipuuv/gojsonpointer"
packages = ["."]
revision = "e0fe6f68307607d540ed8eac07a342c33fa1b54a"
[[projects]]
branch = "master"
name = "github.com/xeipuuv/gojsonreference"
packages = ["."]
revision = "e02fc20de94c78484cd5ffb007f8af96be030a45"
[[projects]]
name = "github.com/xeipuuv/gojsonschema"
packages = ["."]
revision = "e18f0065e8c148fcf567ac43a3f8f5b66ac0720b"
[[projects]]
name = "golang.org/x/net"
packages = ["context"]
revision = "a6577fac2d73be281a500b310739095313165611"
[[projects]]
name = "golang.org/x/time"
packages = ["rate"]
revision = "f51c12702a4d776e4c1fa9b0fabab841babae631"
[[projects]]
name = "gopkg.in/Clever/kayvee-go.v6"
packages = [
".",
"logger",
"router"
]
revision = "096364e316a52652d3493be702d8105d8d01db84"
version = "v6.6.0"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "a5b47d31c556af34a302ce5d659e6fea44d90de0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "4699293f3632dd38561ff60477aa7cc1ecaadc5808b974d017099e2189679286"
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -1,36 +0,0 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
branch = "master"
name = "github.com/Clever/syslogparser"
[[constraint]]
name = "github.com/stretchr/testify"
[[constraint]]
name = "golang.org/x/time"
[[constraint]]
name = "gopkg.in/Clever/kayvee-go.v6"
version = "6.0.0"

View file

@ -5,7 +5,7 @@ SHELL := /bin/bash
PKG := github.com/Clever/amazon-kinesis-client-go
PKGS := $(shell go list ./... | grep -v /vendor )
.PHONY: download_jars run build
$(eval $(call golang-version-check,1.12))
$(eval $(call golang-version-check,1.13))
CONSUMER ?= consumer
TMP_DIR := ./tmp-jars
@ -37,7 +37,7 @@ download_jars:
mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/
# Download the STS jar file for supporting IAM Roles
ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt
curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar
curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar
rm -r $(TMP_DIR)
all: test build
@ -61,5 +61,5 @@ $(PKGS): golang-test-all-deps
install_deps: golang-dep-vendor-deps
$(call golang-dep-vendor)
install_deps:
go mod vendor

1
VERSION Normal file
View file

@ -0,0 +1 @@
1.0.0

View file

@ -30,16 +30,17 @@ type datum struct {
var queue = make(chan datum, 1000)
func init() {
data := map[string]int{}
countData := map[string]int{}
gaugeData := map[string]int{}
tick := time.Tick(time.Minute)
go func() {
for {
select {
case d := <-queue:
if d.category == "counter" {
data[d.name] = data[d.name] + d.value
countData[d.name] = countData[d.name] + d.value
} else if d.category == "gauge" {
data[d.name] = d.value
gaugeData[d.name] = d.value
} else {
log.ErrorD("unknown-stat-category", logger.M{"category": d.category})
}
@ -48,10 +49,14 @@ func init() {
for _, k := range DefaultCounters {
tmp[k] = 0
}
for k, v := range data {
for k, v := range countData {
tmp[k] = v
}
for k, v := range gaugeData {
tmp[k] = v
}
log.InfoD("stats", tmp)
countData = map[string]int{}
}
}
}()

View file

@ -57,19 +57,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
return nil
}
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records:
// - records emitted from CWLogs Subscription
// - records emitted from KPL
if !splitter.IsGzipped(record) {
// Process a single message, from KPL
return [][]byte{record}, nil
}
// Process a batch of messages from a CWLogs Subscription
return splitter.GetMessagesFromGzippedInput(record)
}
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
var pair kcl.SequencePair
prevPair := b.lastProcessedSeq
@ -93,7 +80,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
return err
}
messages, err := b.splitMessageIfNecessary(data)
messages, err := splitter.SplitMessageIfNecessary(data)
if err != nil {
return err
}

View file

@ -352,7 +352,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
}
func TestStaggeredCheckpionting(t *testing.T) {
func TestStaggeredCheckpointing(t *testing.T) {
assert := assert.New(t)
mockFailedLogsFile := logger.New("testing")

BIN
bin/dep Executable file

Binary file not shown.

View file

@ -2,9 +2,11 @@ package decode
import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/Clever/syslogparser/rfc3164"
)
@ -15,9 +17,18 @@ var reservedFields = []string{
"prefix",
"postfix",
"decoder_msg_type",
// used by all logs
"timestamp",
"hostname",
"rawlog",
// Used by Kayvee
"prefix",
"postfix",
// Set to the deepest step of parsing that succeeded
"decoder_msg_type",
}
func stringInSlice(s string, slice []string) bool {
@ -107,7 +118,7 @@ func FieldsFromKayvee(line string) (map[string]interface{}, error) {
return m, nil
}
var userPattern = `#\sUser@Host:\s(?P<user>[a-zA-Z]+\[[a-zA-Z]+\])\s@\s[a-zA-Z]+.*Id:\s+(?P<id>[0-9]+)`
var userPattern = `#\sUser@Host:\s(?P<user>[a-zA-Z]+\[[a-zA-Z]+\])\s@\s.*Id:\s+(?P<id>[0-9]+)`
var userPatternRegex = regexp.MustCompile(userPattern)
func FieldsFromRDSSlowquery(rawlog string) map[string]interface{} {
@ -385,17 +396,21 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
out := map[string]interface{}{}
syslog, syslogErr := FieldsFromSyslog(line)
if syslogErr == nil {
return decodeSyslog(syslog, env)
}
syslogFields, err := FieldsFromSyslog(line)
if err != nil {
return map[string]interface{}{}, err
fluentLog, fluentErr := FieldsFromFluentbitLog(line)
if fluentErr == nil {
return decodeFluent(fluentLog, env)
}
for k, v := range syslogFields {
out[k] = v
}
rawlog := syslogFields["rawlog"].(string)
programname := syslogFields["programname"].(string)
return nil, fmt.Errorf("unable to parse log line with errors `%v` and `%v`", syslogErr, fluentErr)
}
func decodeSyslog(syslog map[string]interface{}, env string) (map[string]interface{}, error) {
rawlog := syslog["rawlog"].(string)
// Try pulling Kayvee fields out of message
kvFields, err := FieldsFromKayvee(rawlog)
@ -405,56 +420,39 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
}
} else {
for k, v := range kvFields {
out[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic
out["env"] = env
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
// A separate container monitors for start/stop events, but we set the container values in such a way that
// the logs for these events will appear in context for the app that the user is looking at instead of the
// docker-events app.
forceEnv := ""
forceApp := ""
forceTask := ""
if cEnv, ok := out["container_env"]; ok {
forceEnv = cEnv.(string)
}
if cApp, ok := out["container_app"]; ok {
forceApp = cApp.(string)
}
if cTask, ok := out["container_task"]; ok {
forceTask = cTask.(string)
}
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
if err == nil {
for k, v := range meta {
out[k] = v
syslog[k] = v
}
}
// Try pulling RDS slowquery logs fields out of message
if out["hostname"] == "aws-rds" {
if syslog["hostname"] == "aws-rds" {
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
for k, v := range slowQueryFields {
out[k] = v
syslog[k] = v
}
}
return out, nil
// Inject additional fields that are useful in log-searching and other business logic
syslog["env"] = env
// this can error, which indicates inability to extract container meta. That's fine, we can ignore that.
addContainterMetaToSyslog(syslog)
return syslog, nil
}
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32})` // task-id (ECS, both EC2 and Fargate)
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID
var containerMetaRegex = regexp.MustCompile(containerMeta)
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
if programname == "" {
return map[string]string{}, fmt.Errorf("no programname")
var errBadProgramname = errors.New("invalid or missing programname")
func addContainterMetaToSyslog(syslog map[string]interface{}) (map[string]interface{}, error) {
programname, ok := syslog["programname"].(string)
if !ok || programname == "" {
return syslog, errBadProgramname
}
env := ""
@ -465,25 +463,116 @@ func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[st
env = matches[0][1]
app = matches[0][2]
task = matches[0][4]
} else {
return syslog, errBadProgramname
}
if forceEnv != "" {
env = forceEnv
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
// A separate container monitors for start/stop events, but we set the container values in such a way that
// the logs for these events will appear in context for the app that the user is looking at instead of the
// docker-events app.
if existingEnv, ok := syslog["container_env"].(string); !ok || existingEnv == "" {
syslog["container_env"] = env
}
if forceApp != "" {
app = forceApp
if existingApp, ok := syslog["container_app"].(string); !ok || existingApp == "" {
syslog["container_app"] = app
}
if forceTask != "" {
task = forceTask
if existingTask, ok := syslog["container_task"].(string); !ok || existingTask == "" {
syslog["container_task"] = task
}
if env == "" || app == "" || task == "" {
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
}
return map[string]string{
"container_env": env,
"container_app": app,
"container_task": task,
}, nil
return syslog, nil
}
const (
// These are the fields in the incoming fluentbit JSON that we expect the timestamp and log
// They are referenced below by the FluentLog type; those are the ones that matter.
fluentbitTimestampField = "fluent_ts"
fluentbitLogField = "log"
// This is what we get from the fluentbig config: `time_key_format: "%Y-%m-%dT%H:%M:%S.%L%z"`
fluentTimeFormat = "2006-01-02T15:04:05.999-0700"
)
// FluentLog represents is the set of fields extracted from an incoming fluentbit log
// The struct tags must line up with the JSON schema in the fluentbit configuration, see the comment for FieldsFromFluentBitlog
type FluentLog struct {
Log *string `json:"log"`
Timestamp *string `json:"fluent_ts"`
TaskArn string `json:"ecs_task_arn"`
TaskDefinition string `json:"ecs_task_definition"`
}
// FieldsFromFluentbitLog parses JSON object with fields indicating that it's coming from FluentBit
// Its return value shares a common interface with the Syslog output - with the same four key fields
// Unlike FieldsFromSyslog, it accepts its argument as bytes, since it will be used as bytes immediately and bytes is what comes off the firehose
// In theory, the format we recieve is highly customizable, so we'll be making the following assumptions:
// - All logs are coming from aws-fargate with the ecs-metadata fields (ecs_cluster, ecs_task_arn, ecs_task_definition) enabled
// - The timestamp is in the field given by the FluentBitTimestampField constant in this package
// - The timestamp is of the format of the constant fluentTimestampFormat
// - The log is in the field given by the FluentBitlogField constant in this package
// - The ecs_task_definition is of the from {environment}--{application}--.*
func FieldsFromFluentbitLog(line string) (*FluentLog, error) {
fluentFields := FluentLog{}
if err := json.Unmarshal([]byte(line), &fluentFields); err != nil {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: err}
}
return &fluentFields, nil
}
func decodeFluent(fluentLog *FluentLog, env string) (map[string]interface{}, error) {
out := map[string]interface{}{}
if fluentLog.Timestamp == nil || *fluentLog.Timestamp == "" {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no timestamp found in input field %s", fluentbitTimestampField)}
}
if timeValue, err := time.Parse(fluentTimeFormat, *fluentLog.Timestamp); err == nil {
out["timestamp"] = timeValue
} else {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("timestamp has bad format: %s", *fluentLog.Timestamp)}
}
if fluentLog.Log == nil {
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no log found in input field %s", fluentbitLogField)}
}
log := *fluentLog.Log
out["rawlog"] = log
out["decoder_msg_type"] = "fluentbit"
out["hostname"] = "aws-fargate"
// best effort to add container_env|app|task
if parts := strings.SplitN(fluentLog.TaskDefinition, "--", 3); len(parts) == 3 {
out["container_env"] = parts[0]
out["container_app"] = parts[1]
}
if idx := strings.LastIndex(fluentLog.TaskArn, "/"); idx != -1 {
out["container_task"] = fluentLog.TaskArn[idx+1:]
}
kvFields, err := FieldsFromKayvee(log)
if err == nil {
for k, v := range kvFields {
out[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic; mimicking syslog behavior
out["env"] = env
return out, nil
}
type BadLogFormatError struct {
Format string
DecodingError error
}
func (b BadLogFormatError) Error() string {
return fmt.Sprintf("trying to decode log as format %s: %v", b.Format, b.DecodingError)
}
func (b BadLogFormatError) Unwrap() error {
return b.DecodingError
}

View file

@ -1,6 +1,7 @@
package decode
import (
"errors"
"fmt"
"sort"
"testing"
@ -221,10 +222,10 @@ func TestSyslogDecoding(t *testing.T) {
fields, err := FieldsFromSyslog(spec.Input)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
assert.True(errors.As(err, &spec.ExpectedError))
return
}
assert.NoError(err)
assert.Equal(spec.ExpectedOutput, fields)
})
}
@ -251,6 +252,11 @@ func TestParseAndEnhance(t *testing.T) {
}
logTime3 = logTime3.UTC()
logTime4, err := time.Parse(fluentTimeFormat, "2020-08-13T21:10:57.000+0000")
if err != nil {
t.Fatal(err)
}
specs := []ParseAndEnhanceSpec{
ParseAndEnhanceSpec{
Title: "Parses a Kayvee log line from an ECS app",
@ -307,10 +313,10 @@ func TestParseAndEnhance(t *testing.T) {
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Fails to parse non-RSyslog log line",
Title: "Fails to parse non-RSyslog, non-Fluent log line",
Line: `not rsyslog`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &syslogparser.ParserError{},
ExpectedError: fmt.Errorf(""),
},
ParseAndEnhanceSpec{
Title: "Parses JSON values",
@ -345,7 +351,7 @@ func TestParseAndEnhance(t *testing.T) {
},
},
ParseAndEnhanceSpec{
Title: "RDS Slowquery Log",
Title: "RDS Slowquery Log rdsadmin",
Line: `2017-04-05T21:57:46+00:00 aws-rds production-aurora-test-db: Slow query: # Time: 190921 16:02:59
# User@Host: rdsadmin[rdsadmin] @ localhost [] Id: 1
# Query_time: 22.741550 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0SET timestamp=1569081779;call action start_seamless_scaling('AQEAAB1P/PAIqvTHEQFJAEkojZUoH176FGJttZ62JF5QmRehaf0S0VFTa+5MPJdYQ9k0/sekBlnMi8U=', 300000, 2);
@ -361,6 +367,179 @@ SET timestamp=1569862702;`,
"user_id": "1",
},
},
ParseAndEnhanceSpec{
Title: "RDS Slowquery Log clever user",
Line: `2017-04-05T21:57:46+00:00 aws-rds production-aurora-test-db: Slow query: # Time: 190921 16:02:59
# User@Host: clever[clever] @ [10.1.11.112] Id: 868
# Query_time: 2.000270 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0
SET timestamp=1571090308;
select sleep(2);`,
ExpectedOutput: map[string]interface{}{
"env": "deploy-env",
"hostname": "aws-rds",
"programname": "production-aurora-test-db",
"decoder_msg_type": "syslog",
"rawlog": "Slow query: # Time: 190921 16:02:59\n# User@Host: clever[clever] @ [10.1.11.112] Id: 868\n# Query_time: 2.000270 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 0\nSET timestamp=1571090308;\nselect sleep(2);",
"timestamp": logTime2,
"user": "clever[clever]",
"user_id": "868",
},
},
{
Title: "Valid fluent log",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"ecs_task_definition": "env--app--d--2:2",
"fluent_ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"source": "stderr"
}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime4,
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"hostname": "aws-fargate",
"decoder_msg_type": "Kayvee",
"title": "request-finished",
"container_app": "app",
"container_env": "env",
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
"prefix": "2020/08/13 21:10:57 ",
"postfix": "",
"env": "deploy-env",
},
ExpectedError: nil,
},
{
Title: "fluent log missing ecs_task_definition still parses and has container_task",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"fluent_ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"source": "stderr"
}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime4,
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"hostname": "aws-fargate",
"decoder_msg_type": "Kayvee",
"title": "request-finished",
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
"prefix": "2020/08/13 21:10:57 ",
"postfix": "",
"env": "deploy-env",
},
},
{
Title: "fluent log with bad ecs_task_definition format still parses and has container_task",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"ecs_task_definition": "env--app",
"fluent_ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"source": "stderr"
}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime4,
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"hostname": "aws-fargate",
"decoder_msg_type": "Kayvee",
"title": "request-finished",
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
"prefix": "2020/08/13 21:10:57 ",
"postfix": "",
"env": "deploy-env",
},
},
{
Title: "fluent log missing ecs_task_arn still parses and has containter_env and app",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_definition": "env--app--d--2:2",
"fluent_ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"source": "stderr"
}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime4, // time.Date(2020, 8, 13, 21, 10, 57, 0, time.UTC),
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
"hostname": "aws-fargate",
"decoder_msg_type": "Kayvee",
"title": "request-finished",
"container_env": "env",
"container_app": "app",
"prefix": "2020/08/13 21:10:57 ",
"postfix": "",
"env": "deploy-env",
},
},
{
Title: "fluent log missing timestamp is an error",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"ecs_task_definition": "env--app--d--2:2",
"ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
"source": "stderr"
}`,
ExpectedOutput: nil,
ExpectedError: BadLogFormatError{},
},
{
Title: "fluent log with bad timestamp format is an error",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"ecs_task_definition": "env--app--d--2:2",
"fluent_ts": "2020-08-13T21:10:57.000Z",
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
"source": "stderr"
}`,
ExpectedOutput: nil,
ExpectedError: BadLogFormatError{},
},
{
Title: "Valid fluent log with overrides for container_env, app, and task",
Line: `{
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
"ecs_task_definition": "env--app--d--2:2",
"fluent_ts": "2020-08-13T21:10:57.000+0000",
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
"source": "stderr"
}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime4,
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
"hostname": "aws-fargate",
"decoder_msg_type": "Kayvee",
"title": "request-finished",
"container_app": "app2",
"container_env": "env2",
"container_task": "task",
"prefix": "2020/08/13 21:10:57 ",
"postfix": "",
"env": "deploy-env",
},
ExpectedError: nil,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
@ -368,72 +547,114 @@ SET timestamp=1569862702;`,
fields, err := ParseAndEnhance(spec.Line, "deploy-env")
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
assert.True(errors.As(err, &spec.ExpectedError))
return
}
assert.NoError(err)
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
func TestGetContainerMeta(t *testing.T) {
assert := assert.New(t)
t.Log("Must have a programname to get container meta")
programname := ""
_, err := getContainerMeta(programname, "", "", "")
assert.Error(err)
type containerMetaSpec struct {
description string
input map[string]interface{}
wantErr bool
wantContainerEnv string
wantContainerApp string
wantContainerTask string
}
t.Log("Can parse a programname")
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err := getContainerMeta(programname, "", "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
tests := []containerMetaSpec{
{
description: "Must have a programname to get container meta",
input: map[string]interface{}{
"programname": "",
},
wantErr: true,
},
{
description: "Can parse a programname",
input: map[string]interface{}{
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
},
wantContainerEnv: "env1",
wantContainerApp: "app2",
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
{
description: "Can override just 'env'",
input: map[string]interface{}{
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"container_env": "force-env",
},
wantContainerEnv: "force-env",
wantContainerApp: "app2",
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
{
description: "Can override just 'app'",
input: map[string]interface{}{
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"container_app": "force-app",
},
wantContainerEnv: "env1",
wantContainerApp: "force-app",
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
{
description: "Can override just 'task'",
input: map[string]interface{}{
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"container_task": "force-task",
},
wantContainerEnv: "env1",
wantContainerApp: "app2",
wantContainerTask: "force-task",
},
{
description: "Can override all fields",
input: map[string]interface{}{
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"container_task": "force-task",
"container_app": "force-app",
"container_env": "force-env",
},
wantContainerEnv: "force-env",
wantContainerApp: "force-app",
wantContainerTask: "force-task",
},
}
t.Log("Can override just 'env'")
overrideEnv := "force-env"
meta, err = getContainerMeta(programname, overrideEnv, "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
for _, tcase := range tests {
assert := assert.New(t)
t.Log("Can override just 'app'")
overrideApp := "force-app"
meta, err = getContainerMeta(programname, "", overrideApp, "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": overrideApp,
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
syslog := map[string]interface{}{}
for k, v := range tcase.input {
syslog[k] = v
}
t.Log("Can override just 'task'")
overrideTask := "force-task"
meta, err = getContainerMeta(programname, "", "", overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": overrideTask,
}, meta)
newSyslog, err := addContainterMetaToSyslog(syslog)
if tcase.wantErr {
assert.Error(err)
continue
}
for k, v := range newSyslog {
switch k {
case "container_env":
assert.Equal(v, tcase.wantContainerEnv)
case "container_app":
assert.Equal(v, tcase.wantContainerApp)
case "container_task":
assert.Equal(v, tcase.wantContainerTask)
default:
assert.Equal(v, tcase.input[k])
}
}
}
t.Log("Can override all fields")
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": overrideApp,
"container_task": overrideTask,
}, meta)
}
func TestExtractKVMeta(t *testing.T) {

22
go.mod Normal file
View file

@ -0,0 +1,22 @@
module github.com/amazon-kinesis-client-go
go 1.13
require (
github.com/Clever/amazon-kinesis-client-go v1.0.0
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340
github.com/a8m/kinesis-producer v0.2.0
github.com/aws/aws-sdk-go v1.35.28 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 // indirect
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d // indirect
github.com/stretchr/testify v1.7.0
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gopkg.in/Clever/kayvee-go.v6 v6.24.1
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
)

63
go.sum Normal file
View file

@ -0,0 +1,63 @@
github.com/Clever/amazon-kinesis-client-go v1.0.0 h1:rG+hkHCSe+xMGk3asESg18tiEj8X6To8vj0TRa1pKvQ=
github.com/Clever/amazon-kinesis-client-go v1.0.0/go.mod h1:jLfi8QusUjdsYazmEH0DlSO4gE1WRzVE7cs4c/fOFdI=
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340 h1:wr8lTqPJZQpZkZgllQtrD96SecXkAu5MxzY3yJgsuCg=
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340/go.mod h1:e7Yy7RTiIMU9pZ+dcSviX3cpod8e0CEeTUPTgBbKlRE=
github.com/a8m/kinesis-producer v0.2.0 h1:Bd5Oi4dczbTLPIZwVbm02464LIFgBqmViFj//b098xc=
github.com/a8m/kinesis-producer v0.2.0/go.mod h1:CxoFe0Y49udKMnQPkC5S5VmZZy6a+Bef9otuoH96Pv0=
github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc=
github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 h1:FRDp/rRU2sThfssdHIUATCKJOms8R6A9rrhdC1sLqFI=
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7/go.mod h1:mQyv/QAgjs9+PTi/iXveno+U86nKGsltjqf3ilYx4Bg=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d h1:ix3WmphUvN0GDd0DO9MH0v6/5xTv+Xm1bPN+1UJn58k=
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 h1:tdnG+ZILOafvA29+pYKP3gauEbigHll3PHsf1GQa2ms=
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/Clever/kayvee-go.v6 v6.24.1 h1:lHxjlTFj57mfrq06PiCKIAJ0QuHHEqsyvSYvzb+gSFg=
gopkg.in/Clever/kayvee-go.v6 v6.24.1/go.mod h1:G0m6nBZj7Kdz+w2hiIaawmhXl5zp7E/K0ashol3Kb2A=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca h1:oivFrl3Vo+KfpUmTDJvz91I+BWzDPOQ+0CNR5jwTHcg=
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=

View file

@ -1,15 +1,18 @@
# This is the default Clever Golang Makefile.
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
# Please do not alter this file directly.
GOLANG_MK_VERSION := 0.4.0
GOLANG_MK_VERSION := 1.0.0
SHELL := /bin/bash
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
.PHONY: golang-test-deps bin/dep golang-ensure-curl-installed
.PHONY: golang-test-deps golang-ensure-curl-installed
# set timezone to UTC for golang to match circle and deploys
export TZ=UTC
# go build flags for use across all commands which accept them
GO_BUILD_FLAGS := "-mod=vendor"
# if the gopath includes several directories, use only the first
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
@ -32,57 +35,21 @@ _ := $(if \
endef
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
# We pin its version since its a simple tool that does its job as-is;
# so we're defended against it breaking or changing in the future.
FGT := $(GOPATH)/bin/fgt
$(FGT):
go get github.com/GeertJohan/fgt
go get github.com/GeertJohan/fgt@262f7b11eec07dc7b147c44641236f3212fee89d
golang-ensure-curl-installed:
@command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; }
DEP_VERSION = v0.4.1
DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]')
# Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but
# not yet the official tool for Golang.
ifeq ($(DEP_VERSION),$(DEP_INSTALLED))
bin/dep: # nothing to do, dep is already up-to-date
else
CACHED_DEP = /tmp/dep-$(DEP_VERSION)
bin/dep: golang-ensure-curl-installed
@echo "Updating dep..."
@mkdir -p bin
@if [ ! -f $(CACHED_DEP) ]; then curl -o $(CACHED_DEP) -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64; fi;
@cp $(CACHED_DEP) bin/dep
@chmod +x bin/dep || true
endif
# figure out "github.com/<org>/<repo>"
# `go list` will fail if there are no .go files in the directory
# if this is the case, fall back to assuming github.com/Clever
REF = $(shell go list || echo github.com/Clever/$(notdir $(shell pwd)))
golang-verify-no-self-references:
@if grep -q -i "$(REF)" Gopkg.lock; then echo "Error: Gopkg.lock includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
@if grep -q -i "$(REF)" Gopkg.toml; then echo "Error: Gopkg.toml includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
golang-dep-vendor-deps: bin/dep golang-verify-no-self-references
# golang-godep-vendor is a target for saving dependencies with the dep tool
# to the vendor/ directory. All nested vendor/ directories are deleted via
# the prune command.
# In CI, -vendor-only is used to avoid updating the lock file.
ifndef CI
define golang-dep-vendor
bin/dep ensure -v
endef
else
define golang-dep-vendor
bin/dep ensure -v -vendor-only
endef
endif
# Golint is a tool for linting Golang code for common errors.
# We pin its version because an update could add a new lint check which would make
# previously passing tests start failing without changing our code.
GOLINT := $(GOPATH)/bin/golint
$(GOLINT):
go get golang.org/x/lint/golint
go get golang.org/x/lint/golint@738671d3881b9731cc63024d5d88cf28db875626
# golang-fmt-deps requires the FGT tool for checking output
golang-fmt-deps: $(FGT)
@ -91,7 +58,7 @@ golang-fmt-deps: $(FGT)
# arg1: pkg path
define golang-fmt
@echo "FORMATTING $(1)..."
@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); $(FGT) gofmt -l=true $${PKG_PATH}/*.go
endef
# golang-lint-deps requires the golint tool for golang linting.
@ -101,7 +68,7 @@ golang-lint-deps: $(GOLINT)
# arg1: pkg path
define golang-lint
@echo "LINTING $(1)..."
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT)
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(GOLINT)
endef
# golang-lint-deps-strict requires the golint tool for golang linting.
@ -112,7 +79,7 @@ golang-lint-deps-strict: $(GOLINT) $(FGT)
# arg1: pkg path
define golang-lint-strict
@echo "LINTING $(1)..."
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
endef
# golang-test-deps is here for consistency
@ -122,7 +89,7 @@ golang-test-deps:
# arg1: pkg path
define golang-test
@echo "TESTING $(1)..."
@go test -v $(1)
@go test $(GO_BUILD_FLAGS) -v $(1)
endef
# golang-test-strict-deps is here for consistency
@ -132,7 +99,7 @@ golang-test-strict-deps:
# arg1: pkg path
define golang-test-strict
@echo "TESTING $(1)..."
@go test -v -race $(1)
@go test -v $(GO_BUILD_FLAGS) -race $(1)
endef
# golang-vet-deps is here for consistency
@ -142,7 +109,7 @@ golang-vet-deps:
# arg1: pkg path
define golang-vet
@echo "VETTING $(1)..."
@go vet $(GOPATH)/src/$(1)/*.go
@go vet $(GO_BUILD_FLAGS) $(1)
endef
# golang-test-all-deps installs all dependencies needed for different test cases.
@ -176,14 +143,14 @@ endef
define golang-build
@echo "BUILDING..."
@if [ -z "$$CI" ]; then \
go build -o bin/$(2) $(1); \
go build $(GO_BUILD_FLAGS) -o bin/$(2) $(1); \
else \
echo "-> Building CGO binary"; \
CGO_ENABLED=0 go build -installsuffix cgo -o bin/$(2) $(1); \
CGO_ENABLED=0 go build $(GO_BUILD_FLAGS) -installsuffix cgo -o bin/$(2) $(1); \
fi;
endef
# golang-update-makefile downloads latest version of golang.mk
golang-update-makefile:
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang-v1.mk -O /tmp/golang.mk 2>/dev/null
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi

View file

@ -1,16 +1,150 @@
// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream.
// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles,
// as well as apply appropriate decompression.
// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as
// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer.
// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use
// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves.
// - Deaggregate if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary
// in addition to the KPL splitting.
package splitter
import (
"bytes"
"compress/gzip"
"compress/zlib"
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
"strconv"
"time"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
)
// The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single
// Amazon Kinesis record for efficient puts.
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
// IsKPLAggregate checks a record for a KPL aggregate prefix.
// It is not necessary to call this before calling KPLDeaggregate.
func IsKPLAggregate(data []byte) bool {
return bytes.HasPrefix(data, kplMagicNumber)
}
// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation.
// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records.
// Otherwise, the return value is a singleton containing the original record.
func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return [][]byte{kinesisRecord}, nil
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// either the data is corrupted or this is not a KPL aggregate
// either way, return the data as-is
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
records = append(records, userRecord.Data)
}
return records, nil
}
// Deaggregate is a combination of KPLDeaggregate and SplitMessageIfNecessary
// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record.
// If successful, it iterates over the individual user records and attempts to unzlib them.
// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version.
// If it is not zlibbed, the output will contain the record verbatim
// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary.
// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header.
// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib.
//
// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles.
func Deaggregate(kinesisRecord []byte) ([][]byte, error) {
if !IsKPLAggregate(kinesisRecord) {
return SplitMessageIfNecessary(kinesisRecord)
}
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
recordSum := md5.Sum(src)
for i, b := range checksum {
if b != recordSum[i] {
// either the data is corrupted or this is not a KPL aggregate
// either way, return the data as-is
return [][]byte{kinesisRecord}, nil
}
}
dest := new(kpl.AggregatedRecord)
err := proto.Unmarshal(src, dest)
if err != nil {
return nil, fmt.Errorf("unmarshalling proto: %v", err)
}
var records [][]byte
for _, userRecord := range dest.GetRecords() {
record, err := unzlib(userRecord.Data)
if err != nil {
return nil, fmt.Errorf("unzlibbing record: %w", err)
}
records = append(records, record)
}
return records, nil
}
// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records.
// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this.
// This function handles three types of records:
// - records emitted from CWLogs Subscription (which are gzip compressed)
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit
// - any other record (left unchanged)
//
// CloudWatch logs come as structured JSON. In the process of splitting, they are converted
// into an rsyslog format that allows fairly uniform parsing of the result across the
// AWS services that might emit logs to CloudWatch.
// Note that these timezone used in these syslog records is guessed based on the local env.
// If you need consistent timezones, set TZ=UTC in your environment.
func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) {
// First try the record as a CWLogs record
if IsGzipped(userRecord) {
return GetMessagesFromGzippedInput(userRecord)
}
unzlibRecord, err := unzlib(userRecord)
if err != nil {
return nil, err
}
// Process a single message, from KPL
return [][]byte{unzlibRecord}, nil
}
func unzlib(input []byte) ([]byte, error) {
zlibReader, err := zlib.NewReader(bytes.NewReader(input))
if err == nil {
unzlibRecord, err := ioutil.ReadAll(zlibReader)
if err != nil {
return nil, fmt.Errorf("reading zlib-compressed record: %v", err)
}
return unzlibRecord, nil
}
return input, nil
}
// LogEvent is a single log line within a LogEventBatch
type LogEvent struct {
ID string `json:"id"`
@ -116,6 +250,11 @@ var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
// glue log groups are of the form /aws-glue/jobs/<env>/<app>/<pod-id>
// glue log streams are of the form <job id>-<"driver" | "1" | "progress-bar">
var awsGlueLogGroupRegex = regexp.MustCompile(`^/aws-glue/jobs/([a-z0-9-]+)/([a-z0-9-]+)/([a-z0-9-]+)$`)
var awsGlueLogStreamRegex = regexp.MustCompile(`^(jr_[a-z0-9-]+)-.*$`)
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
const taskCruft = `12345678-1234-1234-1234-555566667777`
@ -228,6 +367,32 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
return out, true
}
func splitAWSGlue(b LogEventBatch) ([]RSysLogMessage, bool) {
matches := awsGlueLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
if len(matches) != 1 {
return nil, false
}
env := matches[0][1]
app := matches[0][2]
streamMatches := awsGlueLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(streamMatches) != 1 {
return nil, false
}
jobID := streamMatches[0][1]
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
out = append(out, RSysLogMessage{
Timestamp: event.Timestamp.Time(),
ProgramName: env + "--" + app + arnCruft + jobID,
Hostname: "aws-glue",
Message: event.Message,
})
}
return out, true
}
func splitDefault(b LogEventBatch) []RSysLogMessage {
out := []RSysLogMessage{}
for _, event := range b.LogEvents {
@ -261,6 +426,8 @@ func Split(b LogEventBatch) [][]byte {
return stringify(rsyslogMsgs)
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
return stringify(rsyslogMsgs)
} else if rsyslogMsgs, ok := splitAWSGlue(b); ok {
return stringify(rsyslogMsgs)
}
return stringify(splitDefault(b))
}

View file

@ -3,15 +3,29 @@ package splitter
import (
"bytes"
"compress/gzip"
"compress/zlib"
"crypto/md5"
b64 "encoding/base64"
"encoding/json"
"os"
"testing"
"time"
"github.com/Clever/amazon-kinesis-client-go/decode"
kpl "github.com/a8m/kinesis-producer"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
// In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string,
// the timezone used in the final string depends on the locally set timezone.
// in order for tests to pass, we set TZ to UTC
os.Setenv("TZ", "UTC")
os.Exit(m.Run())
}
func TestUnpacking(t *testing.T) {
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
@ -253,3 +267,290 @@ func TestSplitRDS(t *testing.T) {
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
}
}
func TestSplitGlue(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws-glue/jobs/clever-dev/analytics-district-participation/aae75f00",
LogStream: "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28-1",
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: NewUnixTimestampMillis(1498519943285),
Message: "foo bar.",
},
},
}
lines := Split(input)
expected := [][]byte{
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-glue clever-dev--analytics-district-participation/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2Fjr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28: foo bar.`),
}
assert.Equal(t, expected, lines)
for _, line := range expected {
enhanced, err := decode.ParseAndEnhance(string(line), "")
require.Nil(t, err)
assert.Equal(t, "aws-glue", enhanced["hostname"])
assert.Equal(t, "clever-dev", enhanced["container_env"])
assert.Equal(t, "analytics-district-participation", enhanced["container_app"])
assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"])
}
}
func TestSplitIfNecesary(t *testing.T) {
// We provide three different inputs to batchedWriter.splitMessageIfNecessary
// plain text
// zlib compressed text
// gzip compressed CloudWatch logs batch
// we verify that the split function matches the input against the correct splitter
// and decodes it.
assert := assert.New(t)
plainTextInput := []byte("hello, world!")
records, err := SplitMessageIfNecessary(plainTextInput)
assert.NoError(err)
assert.Equal(
records,
[][]byte{[]byte("hello, world!")},
)
var z bytes.Buffer
zbuf := zlib.NewWriter(&z)
zbuf.Write([]byte("hello, world!"))
zbuf.Close()
zlibSingleInput := z.Bytes()
records, err = SplitMessageIfNecessary(zlibSingleInput)
assert.NoError(err)
assert.Equal(
records,
[][]byte{[]byte("hello, world!")},
)
// the details of this part aren't super important since the actual functionality is
// tested in other tests; for this test we just want to make sure that split function
// correctly realizes it's gzip and call the appropriate CW-log-splitting logic
var g bytes.Buffer
gbuf := gzip.NewWriter(&g)
cwLogBatch := LogEventBatch{
MessageType: "test",
Owner: "test",
LogGroup: "test",
LogStream: "test",
SubscriptionFilters: []string{""},
LogEvents: []LogEvent{{
ID: "test",
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
Message: "test",
}},
}
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
gbuf.Write(cwLogBatchJSON)
gbuf.Close()
gzipBatchInput := g.Bytes()
expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")
records, err = SplitMessageIfNecessary(gzipBatchInput)
assert.NoError(err)
assert.Equal(
records,
[][]byte{expectedRecord},
)
}
func createKPLAggregate(input [][]byte, compress bool) []byte {
var partitionKeyIndex uint64 = 0
records := []*kpl.Record{}
for _, log := range input {
if compress {
var z bytes.Buffer
zbuf := zlib.NewWriter(&z)
zbuf.Write(log)
zbuf.Close()
log = z.Bytes()
}
records = append(records, &kpl.Record{
PartitionKeyIndex: &partitionKeyIndex,
Data: log,
})
}
logProto, err := proto.Marshal(&kpl.AggregatedRecord{
PartitionKeyTable: []string{"ecs_task_arn"},
Records: records,
})
if err != nil {
panic(err)
}
log := append(kplMagicNumber, logProto...)
logHash := md5.Sum(logProto)
return append(log, logHash[0:16]...)
}
func TestKPLDeaggregate(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := KPLDeaggregate(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}
func TestDeaggregate(t *testing.T) {
type test struct {
description string
input []byte
output [][]byte
shouldError bool
}
tests := []test{
{
description: "non-aggregated record",
input: []byte("hello"),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "one kpl-aggregated record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
false,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
false,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
{
description: "one kpl-aggregated zlib record",
input: createKPLAggregate(
[][]byte{[]byte("hello")},
true,
),
output: [][]byte{[]byte("hello")},
shouldError: false,
},
{
description: "three kpl-aggregated zlib record",
input: createKPLAggregate([][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
true,
),
output: [][]byte{
[]byte("hello, "),
[]byte("world"),
[]byte("!"),
},
shouldError: false,
},
}
var g bytes.Buffer
gbuf := gzip.NewWriter(&g)
cwLogBatch := LogEventBatch{
MessageType: "test",
Owner: "test",
LogGroup: "test",
LogStream: "test",
SubscriptionFilters: []string{""},
LogEvents: []LogEvent{{
ID: "test",
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
Message: "test",
}},
}
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
gbuf.Write(cwLogBatchJSON)
gbuf.Close()
gzipBatchInput := g.Bytes()
tests = append(tests, test{
description: "cloudwatch log batch",
input: gzipBatchInput,
output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")},
shouldError: false,
})
for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
out, err := Deaggregate(tt.input)
if tt.shouldError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, out, tt.output)
})
}
}