Compare commits
44 commits
update-jar
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f78d1c6b9b | ||
|
|
a8a4e207fe | ||
|
|
e66f1922b7 | ||
|
|
dc3e5e097c | ||
|
|
9a1371d965 | ||
|
|
cf19a05ced | ||
|
|
3a42d32e53 | ||
|
|
d461ebebb6 | ||
|
|
c11acd24ec | ||
|
|
ad3e33c7d6 | ||
|
|
fa54b6ee39 | ||
|
|
c164f1d895 | ||
|
|
6ac018712d | ||
|
|
7351a3d3f7 | ||
|
|
358de6589c | ||
|
|
fe41fdaac3 | ||
|
|
102de58470 | ||
|
|
54f91f4073 | ||
|
|
29f86e9986 | ||
|
|
42c3480288 | ||
|
|
cc4f7716b3 | ||
|
|
6e8a99c50d | ||
|
|
4dd769ffca | ||
|
|
f8e9c34641 | ||
|
|
2a7e96157e | ||
|
|
4fe27d0d39 | ||
|
|
6b2d1f8a56 | ||
|
|
611f0c0a60 | ||
|
|
1112894639 | ||
|
|
484b54bfe0 | ||
|
|
f5ce6fe4e7 | ||
|
|
9b32d93d1c | ||
|
|
3bedc65483 | ||
|
|
b16af062fb | ||
|
|
30d3925119 | ||
|
|
ebd1e0e39c | ||
|
|
bfe1a6ffb9 | ||
|
|
35b07d7aed | ||
|
|
8a8b44112d | ||
|
|
8be774bc09 | ||
|
|
504ebfad60 | ||
|
|
420ad243a4 | ||
|
|
337d2063f5 | ||
|
|
eb11747434 |
18 changed files with 1023 additions and 310 deletions
|
|
@ -6,6 +6,7 @@ jobs:
|
|||
- image: circleci/golang:1.13-stretch
|
||||
- image: circleci/mongo:3.2.20-jessie-ram
|
||||
environment:
|
||||
GOPRIVATE: github.com/Clever/*
|
||||
CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
|
||||
CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
|
||||
steps:
|
||||
|
|
@ -17,7 +18,13 @@ jobs:
|
|||
- run:
|
||||
command: mkdir -p $CIRCLE_ARTIFACTS $CIRCLE_TEST_REPORTS
|
||||
name: Set up CircleCI artifacts directories
|
||||
- run:
|
||||
command: git config --global "url.ssh://git@github.com/Clever".insteadOf "https://github.com/Clever"
|
||||
- run:
|
||||
name: Add github.com to known hosts
|
||||
command: mkdir -p ~/.ssh && touch ~/.ssh/known_hosts && echo 'github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=' >> ~/.ssh/known_hosts
|
||||
- run: make install_deps
|
||||
- run: make build
|
||||
- run: make bench
|
||||
- run: make test
|
||||
- run: if [ "${CIRCLE_BRANCH}" == "master" ]; then $HOME/ci-scripts/circleci/github-release $GH_RELEASE_TOKEN; fi;
|
||||
|
|
|
|||
1
.github/CODEOWNERS
vendored
Normal file
1
.github/CODEOWNERS
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
* @Clever/eng-infra
|
||||
20
.github/workflows/notify-ci-status.yml
vendored
Normal file
20
.github/workflows/notify-ci-status.yml
vendored
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
name: Notify CI status
|
||||
|
||||
on:
|
||||
check_suite:
|
||||
types: [completed]
|
||||
status:
|
||||
|
||||
jobs:
|
||||
call-workflow:
|
||||
if: >-
|
||||
(github.event.branches[0].name == github.event.repository.default_branch &&
|
||||
(github.event.state == 'error' || github.event.state == 'failure')) ||
|
||||
(github.event.check_suite.head_branch == github.event.repository.default_branch &&
|
||||
github.event.check_suite.conclusion != 'success')
|
||||
uses: Clever/ci-scripts/.github/workflows/reusable-notify-ci-status.yml@master
|
||||
secrets:
|
||||
CIRCLE_CI_INTEGRATIONS_URL: ${{ secrets.CIRCLE_CI_INTEGRATIONS_URL }}
|
||||
CIRCLE_CI_INTEGRATIONS_USERNAME: ${{ secrets.CIRCLE_CI_INTEGRATIONS_USERNAME }}
|
||||
CIRCLE_CI_INTEGRATIONS_PASSWORD: ${{ secrets.CIRCLE_CI_INTEGRATIONS_PASSWORD }}
|
||||
SLACK_BOT_TOKEN: ${{ secrets.DAPPLE_BOT_TOKEN }}
|
||||
84
Gopkg.lock
generated
84
Gopkg.lock
generated
|
|
@ -1,84 +0,0 @@
|
|||
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
|
||||
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/Clever/syslogparser"
|
||||
packages = [
|
||||
".",
|
||||
"rfc3164"
|
||||
]
|
||||
revision = "fb28ad3e4340c046323b7beba685a72fd12ecbe8"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/davecgh/go-spew"
|
||||
packages = ["spew"]
|
||||
revision = "6d212800a42e8ab5c146b8ace3490ee17e5225f9"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/jeromer/syslogparser"
|
||||
packages = ["."]
|
||||
revision = "0e4ae46ea3f08de351074b643d649d5d00661a3c"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/pmezard/go-difflib"
|
||||
packages = ["difflib"]
|
||||
revision = "d8ed2627bdf02c080bf22230dbb337003b7aba2d"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/stretchr/testify"
|
||||
packages = [
|
||||
"assert",
|
||||
"require"
|
||||
]
|
||||
revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0"
|
||||
version = "v1.1.4"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/xeipuuv/gojsonpointer"
|
||||
packages = ["."]
|
||||
revision = "e0fe6f68307607d540ed8eac07a342c33fa1b54a"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/xeipuuv/gojsonreference"
|
||||
packages = ["."]
|
||||
revision = "e02fc20de94c78484cd5ffb007f8af96be030a45"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/xeipuuv/gojsonschema"
|
||||
packages = ["."]
|
||||
revision = "e18f0065e8c148fcf567ac43a3f8f5b66ac0720b"
|
||||
|
||||
[[projects]]
|
||||
name = "golang.org/x/net"
|
||||
packages = ["context"]
|
||||
revision = "a6577fac2d73be281a500b310739095313165611"
|
||||
|
||||
[[projects]]
|
||||
name = "golang.org/x/time"
|
||||
packages = ["rate"]
|
||||
revision = "f51c12702a4d776e4c1fa9b0fabab841babae631"
|
||||
|
||||
[[projects]]
|
||||
name = "gopkg.in/Clever/kayvee-go.v6"
|
||||
packages = [
|
||||
".",
|
||||
"logger",
|
||||
"router"
|
||||
]
|
||||
revision = "096364e316a52652d3493be702d8105d8d01db84"
|
||||
version = "v6.6.0"
|
||||
|
||||
[[projects]]
|
||||
name = "gopkg.in/yaml.v2"
|
||||
packages = ["."]
|
||||
revision = "a5b47d31c556af34a302ce5d659e6fea44d90de0"
|
||||
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "4699293f3632dd38561ff60477aa7cc1ecaadc5808b974d017099e2189679286"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
36
Gopkg.toml
36
Gopkg.toml
|
|
@ -1,36 +0,0 @@
|
|||
|
||||
# Gopkg.toml example
|
||||
#
|
||||
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
|
||||
# for detailed Gopkg.toml documentation.
|
||||
#
|
||||
# required = ["github.com/user/thing/cmd/thing"]
|
||||
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
|
||||
#
|
||||
# [[constraint]]
|
||||
# name = "github.com/user/project"
|
||||
# version = "1.0.0"
|
||||
#
|
||||
# [[constraint]]
|
||||
# name = "github.com/user/project2"
|
||||
# branch = "dev"
|
||||
# source = "github.com/myfork/project2"
|
||||
#
|
||||
# [[override]]
|
||||
# name = "github.com/x/y"
|
||||
# version = "2.4.0"
|
||||
|
||||
|
||||
[[constraint]]
|
||||
branch = "master"
|
||||
name = "github.com/Clever/syslogparser"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/stretchr/testify"
|
||||
|
||||
[[constraint]]
|
||||
name = "golang.org/x/time"
|
||||
|
||||
[[constraint]]
|
||||
name = "gopkg.in/Clever/kayvee-go.v6"
|
||||
version = "6.0.0"
|
||||
4
Makefile
4
Makefile
|
|
@ -61,5 +61,5 @@ $(PKGS): golang-test-all-deps
|
|||
|
||||
|
||||
|
||||
install_deps: golang-dep-vendor-deps
|
||||
$(call golang-dep-vendor)
|
||||
install_deps:
|
||||
go mod vendor
|
||||
|
|
|
|||
1
VERSION
Normal file
1
VERSION
Normal file
|
|
@ -0,0 +1 @@
|
|||
1.0.0
|
||||
|
|
@ -30,16 +30,17 @@ type datum struct {
|
|||
var queue = make(chan datum, 1000)
|
||||
|
||||
func init() {
|
||||
data := map[string]int{}
|
||||
countData := map[string]int{}
|
||||
gaugeData := map[string]int{}
|
||||
tick := time.Tick(time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case d := <-queue:
|
||||
if d.category == "counter" {
|
||||
data[d.name] = data[d.name] + d.value
|
||||
countData[d.name] = countData[d.name] + d.value
|
||||
} else if d.category == "gauge" {
|
||||
data[d.name] = d.value
|
||||
gaugeData[d.name] = d.value
|
||||
} else {
|
||||
log.ErrorD("unknown-stat-category", logger.M{"category": d.category})
|
||||
}
|
||||
|
|
@ -48,10 +49,14 @@ func init() {
|
|||
for _, k := range DefaultCounters {
|
||||
tmp[k] = 0
|
||||
}
|
||||
for k, v := range data {
|
||||
for k, v := range countData {
|
||||
tmp[k] = v
|
||||
}
|
||||
for k, v := range gaugeData {
|
||||
tmp[k] = v
|
||||
}
|
||||
log.InfoD("stats", tmp)
|
||||
countData = map[string]int{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -57,19 +57,6 @@ func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
|
||||
// We handle two types of records:
|
||||
// - records emitted from CWLogs Subscription
|
||||
// - records emitted from KPL
|
||||
if !splitter.IsGzipped(record) {
|
||||
// Process a single message, from KPL
|
||||
return [][]byte{record}, nil
|
||||
}
|
||||
|
||||
// Process a batch of messages from a CWLogs Subscription
|
||||
return splitter.GetMessagesFromGzippedInput(record)
|
||||
}
|
||||
|
||||
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||
var pair kcl.SequencePair
|
||||
prevPair := b.lastProcessedSeq
|
||||
|
|
@ -93,7 +80,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
|||
return err
|
||||
}
|
||||
|
||||
messages, err := b.splitMessageIfNecessary(data)
|
||||
messages, err := splitter.SplitMessageIfNecessary(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -352,7 +352,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
|
|||
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
|
||||
}
|
||||
|
||||
func TestStaggeredCheckpionting(t *testing.T) {
|
||||
func TestStaggeredCheckpointing(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
mockFailedLogsFile := logger.New("testing")
|
||||
|
|
|
|||
BIN
bin/dep
Executable file
BIN
bin/dep
Executable file
Binary file not shown.
205
decode/decode.go
205
decode/decode.go
|
|
@ -2,9 +2,11 @@ package decode
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Clever/syslogparser/rfc3164"
|
||||
)
|
||||
|
|
@ -15,9 +17,18 @@ var reservedFields = []string{
|
|||
"prefix",
|
||||
"postfix",
|
||||
"decoder_msg_type",
|
||||
|
||||
// used by all logs
|
||||
"timestamp",
|
||||
"hostname",
|
||||
"rawlog",
|
||||
|
||||
// Used by Kayvee
|
||||
"prefix",
|
||||
"postfix",
|
||||
|
||||
// Set to the deepest step of parsing that succeeded
|
||||
"decoder_msg_type",
|
||||
}
|
||||
|
||||
func stringInSlice(s string, slice []string) bool {
|
||||
|
|
@ -385,17 +396,21 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
|
|||
|
||||
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
|
||||
func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
||||
out := map[string]interface{}{}
|
||||
syslog, syslogErr := FieldsFromSyslog(line)
|
||||
if syslogErr == nil {
|
||||
return decodeSyslog(syslog, env)
|
||||
}
|
||||
|
||||
syslogFields, err := FieldsFromSyslog(line)
|
||||
if err != nil {
|
||||
return map[string]interface{}{}, err
|
||||
fluentLog, fluentErr := FieldsFromFluentbitLog(line)
|
||||
if fluentErr == nil {
|
||||
return decodeFluent(fluentLog, env)
|
||||
}
|
||||
for k, v := range syslogFields {
|
||||
out[k] = v
|
||||
}
|
||||
rawlog := syslogFields["rawlog"].(string)
|
||||
programname := syslogFields["programname"].(string)
|
||||
|
||||
return nil, fmt.Errorf("unable to parse log line with errors `%v` and `%v`", syslogErr, fluentErr)
|
||||
}
|
||||
|
||||
func decodeSyslog(syslog map[string]interface{}, env string) (map[string]interface{}, error) {
|
||||
rawlog := syslog["rawlog"].(string)
|
||||
|
||||
// Try pulling Kayvee fields out of message
|
||||
kvFields, err := FieldsFromKayvee(rawlog)
|
||||
|
|
@ -405,56 +420,39 @@ func ParseAndEnhance(line string, env string) (map[string]interface{}, error) {
|
|||
}
|
||||
} else {
|
||||
for k, v := range kvFields {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Inject additional fields that are useful in log-searching and other business logic
|
||||
out["env"] = env
|
||||
|
||||
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
||||
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
||||
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
||||
// docker-events app.
|
||||
forceEnv := ""
|
||||
forceApp := ""
|
||||
forceTask := ""
|
||||
if cEnv, ok := out["container_env"]; ok {
|
||||
forceEnv = cEnv.(string)
|
||||
}
|
||||
if cApp, ok := out["container_app"]; ok {
|
||||
forceApp = cApp.(string)
|
||||
}
|
||||
if cTask, ok := out["container_task"]; ok {
|
||||
forceTask = cTask.(string)
|
||||
}
|
||||
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
|
||||
if err == nil {
|
||||
for k, v := range meta {
|
||||
out[k] = v
|
||||
syslog[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Try pulling RDS slowquery logs fields out of message
|
||||
if out["hostname"] == "aws-rds" {
|
||||
if syslog["hostname"] == "aws-rds" {
|
||||
slowQueryFields := FieldsFromRDSSlowquery(rawlog)
|
||||
for k, v := range slowQueryFields {
|
||||
out[k] = v
|
||||
syslog[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
// Inject additional fields that are useful in log-searching and other business logic
|
||||
syslog["env"] = env
|
||||
|
||||
// this can error, which indicates inability to extract container meta. That's fine, we can ignore that.
|
||||
addContainterMetaToSyslog(syslog)
|
||||
|
||||
return syslog, nil
|
||||
}
|
||||
|
||||
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
|
||||
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
|
||||
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32})` // task-id (ECS, both EC2 and Fargate)
|
||||
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|[a-z0-9]{32}|jr_[a-z0-9-]+)` // task-id (ECS, both EC2 and Fargate), Glue Job ID
|
||||
|
||||
var containerMetaRegex = regexp.MustCompile(containerMeta)
|
||||
|
||||
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
|
||||
if programname == "" {
|
||||
return map[string]string{}, fmt.Errorf("no programname")
|
||||
var errBadProgramname = errors.New("invalid or missing programname")
|
||||
|
||||
func addContainterMetaToSyslog(syslog map[string]interface{}) (map[string]interface{}, error) {
|
||||
programname, ok := syslog["programname"].(string)
|
||||
if !ok || programname == "" {
|
||||
return syslog, errBadProgramname
|
||||
}
|
||||
|
||||
env := ""
|
||||
|
|
@ -465,25 +463,116 @@ func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[st
|
|||
env = matches[0][1]
|
||||
app = matches[0][2]
|
||||
task = matches[0][4]
|
||||
} else {
|
||||
return syslog, errBadProgramname
|
||||
}
|
||||
|
||||
if forceEnv != "" {
|
||||
env = forceEnv
|
||||
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
|
||||
// A separate container monitors for start/stop events, but we set the container values in such a way that
|
||||
// the logs for these events will appear in context for the app that the user is looking at instead of the
|
||||
// docker-events app.
|
||||
if existingEnv, ok := syslog["container_env"].(string); !ok || existingEnv == "" {
|
||||
syslog["container_env"] = env
|
||||
}
|
||||
if forceApp != "" {
|
||||
app = forceApp
|
||||
if existingApp, ok := syslog["container_app"].(string); !ok || existingApp == "" {
|
||||
syslog["container_app"] = app
|
||||
}
|
||||
if forceTask != "" {
|
||||
task = forceTask
|
||||
if existingTask, ok := syslog["container_task"].(string); !ok || existingTask == "" {
|
||||
syslog["container_task"] = task
|
||||
}
|
||||
|
||||
if env == "" || app == "" || task == "" {
|
||||
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
"container_env": env,
|
||||
"container_app": app,
|
||||
"container_task": task,
|
||||
}, nil
|
||||
return syslog, nil
|
||||
}
|
||||
|
||||
const (
|
||||
// These are the fields in the incoming fluentbit JSON that we expect the timestamp and log
|
||||
// They are referenced below by the FluentLog type; those are the ones that matter.
|
||||
fluentbitTimestampField = "fluent_ts"
|
||||
fluentbitLogField = "log"
|
||||
|
||||
// This is what we get from the fluentbig config: `time_key_format: "%Y-%m-%dT%H:%M:%S.%L%z"`
|
||||
fluentTimeFormat = "2006-01-02T15:04:05.999-0700"
|
||||
)
|
||||
|
||||
// FluentLog represents is the set of fields extracted from an incoming fluentbit log
|
||||
// The struct tags must line up with the JSON schema in the fluentbit configuration, see the comment for FieldsFromFluentBitlog
|
||||
type FluentLog struct {
|
||||
Log *string `json:"log"`
|
||||
Timestamp *string `json:"fluent_ts"`
|
||||
TaskArn string `json:"ecs_task_arn"`
|
||||
TaskDefinition string `json:"ecs_task_definition"`
|
||||
}
|
||||
|
||||
// FieldsFromFluentbitLog parses JSON object with fields indicating that it's coming from FluentBit
|
||||
// Its return value shares a common interface with the Syslog output - with the same four key fields
|
||||
// Unlike FieldsFromSyslog, it accepts its argument as bytes, since it will be used as bytes immediately and bytes is what comes off the firehose
|
||||
// In theory, the format we recieve is highly customizable, so we'll be making the following assumptions:
|
||||
// - All logs are coming from aws-fargate with the ecs-metadata fields (ecs_cluster, ecs_task_arn, ecs_task_definition) enabled
|
||||
// - The timestamp is in the field given by the FluentBitTimestampField constant in this package
|
||||
// - The timestamp is of the format of the constant fluentTimestampFormat
|
||||
// - The log is in the field given by the FluentBitlogField constant in this package
|
||||
// - The ecs_task_definition is of the from {environment}--{application}--.*
|
||||
func FieldsFromFluentbitLog(line string) (*FluentLog, error) {
|
||||
fluentFields := FluentLog{}
|
||||
if err := json.Unmarshal([]byte(line), &fluentFields); err != nil {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: err}
|
||||
}
|
||||
|
||||
return &fluentFields, nil
|
||||
}
|
||||
|
||||
func decodeFluent(fluentLog *FluentLog, env string) (map[string]interface{}, error) {
|
||||
out := map[string]interface{}{}
|
||||
if fluentLog.Timestamp == nil || *fluentLog.Timestamp == "" {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no timestamp found in input field %s", fluentbitTimestampField)}
|
||||
}
|
||||
|
||||
if timeValue, err := time.Parse(fluentTimeFormat, *fluentLog.Timestamp); err == nil {
|
||||
out["timestamp"] = timeValue
|
||||
} else {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("timestamp has bad format: %s", *fluentLog.Timestamp)}
|
||||
}
|
||||
|
||||
if fluentLog.Log == nil {
|
||||
return nil, BadLogFormatError{Format: "fluentbit", DecodingError: fmt.Errorf("no log found in input field %s", fluentbitLogField)}
|
||||
}
|
||||
log := *fluentLog.Log
|
||||
out["rawlog"] = log
|
||||
|
||||
out["decoder_msg_type"] = "fluentbit"
|
||||
out["hostname"] = "aws-fargate"
|
||||
|
||||
// best effort to add container_env|app|task
|
||||
if parts := strings.SplitN(fluentLog.TaskDefinition, "--", 3); len(parts) == 3 {
|
||||
out["container_env"] = parts[0]
|
||||
out["container_app"] = parts[1]
|
||||
}
|
||||
if idx := strings.LastIndex(fluentLog.TaskArn, "/"); idx != -1 {
|
||||
out["container_task"] = fluentLog.TaskArn[idx+1:]
|
||||
}
|
||||
|
||||
kvFields, err := FieldsFromKayvee(log)
|
||||
if err == nil {
|
||||
for k, v := range kvFields {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Inject additional fields that are useful in log-searching and other business logic; mimicking syslog behavior
|
||||
out["env"] = env
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type BadLogFormatError struct {
|
||||
Format string
|
||||
DecodingError error
|
||||
}
|
||||
|
||||
func (b BadLogFormatError) Error() string {
|
||||
return fmt.Sprintf("trying to decode log as format %s: %v", b.Format, b.DecodingError)
|
||||
}
|
||||
|
||||
func (b BadLogFormatError) Unwrap() error {
|
||||
return b.DecodingError
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package decode
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
|
@ -221,10 +222,10 @@ func TestSyslogDecoding(t *testing.T) {
|
|||
fields, err := FieldsFromSyslog(spec.Input)
|
||||
if spec.ExpectedError != nil {
|
||||
assert.Error(err)
|
||||
assert.IsType(spec.ExpectedError, err)
|
||||
} else {
|
||||
assert.NoError(err)
|
||||
assert.True(errors.As(err, &spec.ExpectedError))
|
||||
return
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.Equal(spec.ExpectedOutput, fields)
|
||||
})
|
||||
}
|
||||
|
|
@ -251,6 +252,11 @@ func TestParseAndEnhance(t *testing.T) {
|
|||
}
|
||||
logTime3 = logTime3.UTC()
|
||||
|
||||
logTime4, err := time.Parse(fluentTimeFormat, "2020-08-13T21:10:57.000+0000")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
specs := []ParseAndEnhanceSpec{
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Parses a Kayvee log line from an ECS app",
|
||||
|
|
@ -307,10 +313,10 @@ func TestParseAndEnhance(t *testing.T) {
|
|||
ExpectedError: nil,
|
||||
},
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Fails to parse non-RSyslog log line",
|
||||
Title: "Fails to parse non-RSyslog, non-Fluent log line",
|
||||
Line: `not rsyslog`,
|
||||
ExpectedOutput: map[string]interface{}{},
|
||||
ExpectedError: &syslogparser.ParserError{},
|
||||
ExpectedError: fmt.Errorf(""),
|
||||
},
|
||||
ParseAndEnhanceSpec{
|
||||
Title: "Parses JSON values",
|
||||
|
|
@ -379,6 +385,161 @@ select sleep(2);`,
|
|||
"user_id": "868",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "Valid fluent log",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_app": "app",
|
||||
"container_env": "env",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing ecs_task_definition still parses and has container_task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log with bad ecs_task_definition format still parses and has container_task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_task": "7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing ecs_task_arn still parses and has containter_env and app",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4, // time.Date(2020, 8, 13, 21, 10, 57, 0, time.UTC),
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_env": "env",
|
||||
"container_app": "app",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
},
|
||||
{
|
||||
Title: "fluent log missing timestamp is an error",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: nil,
|
||||
ExpectedError: BadLogFormatError{},
|
||||
},
|
||||
{
|
||||
Title: "fluent log with bad timestamp format is an error",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000Z",
|
||||
"log": "2020/08/13 21:10:57 {\"deploy_env\":\"env\",\"level\":\"info\",\"pod-account\":\"589690932525\",\"pod-id\":\"2870425d-e4b1-4869-b2b7-a6d9fade9be1\",\"pod-region\":\"us-east-1\",\"pod-shortname\":\"us-east-1-dev-canary-2870425d\",\"source\":\"app-service\",\"title\":\"NumFDs\",\"type\":\"gauge\",\"value\":33,\"via\":\"process-metrics\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: nil,
|
||||
ExpectedError: BadLogFormatError{},
|
||||
},
|
||||
{
|
||||
Title: "Valid fluent log with overrides for container_env, app, and task",
|
||||
Line: `{
|
||||
"container_id": "b17752cdd47a480712eca6c9774d782f68b50a876446faec5e26c9e8846bd29e",
|
||||
"container_name": "/ecs-env--app--d--2-2-env--app-e295a2dcf6f9ac849101",
|
||||
"ecs_cluster": "arn:aws:ecs:us-east-1:589690932525:cluster/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1",
|
||||
"ecs_task_arn": "arn:aws:ecs:us-east-1:589690932525:task/pod-2870425d-e4b1-4869-b2b7-a6d9fade9be1/7c1ccb5cb5c44582808b9e516b479eb6",
|
||||
"ecs_task_definition": "env--app--d--2:2",
|
||||
"fluent_ts": "2020-08-13T21:10:57.000+0000",
|
||||
"log": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||
"source": "stderr"
|
||||
}`,
|
||||
ExpectedOutput: map[string]interface{}{
|
||||
"timestamp": logTime4,
|
||||
"rawlog": "2020/08/13 21:10:57 {\"title\": \"request-finished\", \"container_env\": \"env2\", \"container_task\": \"task\", \"container_app\": \"app2\"}",
|
||||
"hostname": "aws-fargate",
|
||||
"decoder_msg_type": "Kayvee",
|
||||
"title": "request-finished",
|
||||
"container_app": "app2",
|
||||
"container_env": "env2",
|
||||
"container_task": "task",
|
||||
"prefix": "2020/08/13 21:10:57 ",
|
||||
"postfix": "",
|
||||
"env": "deploy-env",
|
||||
},
|
||||
ExpectedError: nil,
|
||||
},
|
||||
}
|
||||
for _, spec := range specs {
|
||||
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
|
||||
|
|
@ -386,72 +547,114 @@ select sleep(2);`,
|
|||
fields, err := ParseAndEnhance(spec.Line, "deploy-env")
|
||||
if spec.ExpectedError != nil {
|
||||
assert.Error(err)
|
||||
assert.IsType(spec.ExpectedError, err)
|
||||
} else {
|
||||
assert.NoError(err)
|
||||
assert.True(errors.As(err, &spec.ExpectedError))
|
||||
return
|
||||
}
|
||||
assert.NoError(err)
|
||||
assert.Equal(spec.ExpectedOutput, fields)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetContainerMeta(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Log("Must have a programname to get container meta")
|
||||
programname := ""
|
||||
_, err := getContainerMeta(programname, "", "", "")
|
||||
assert.Error(err)
|
||||
type containerMetaSpec struct {
|
||||
description string
|
||||
input map[string]interface{}
|
||||
wantErr bool
|
||||
wantContainerEnv string
|
||||
wantContainerApp string
|
||||
wantContainerTask string
|
||||
}
|
||||
|
||||
t.Log("Can parse a programname")
|
||||
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
||||
meta, err := getContainerMeta(programname, "", "", "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": "app2",
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
tests := []containerMetaSpec{
|
||||
{
|
||||
description: "Must have a programname to get container meta",
|
||||
input: map[string]interface{}{
|
||||
"programname": "",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
description: "Can parse a programname",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'env'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_env": "force-env",
|
||||
},
|
||||
wantContainerEnv: "force-env",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'app'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_app": "force-app",
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "force-app",
|
||||
wantContainerTask: "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
},
|
||||
{
|
||||
description: "Can override just 'task'",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_task": "force-task",
|
||||
},
|
||||
wantContainerEnv: "env1",
|
||||
wantContainerApp: "app2",
|
||||
wantContainerTask: "force-task",
|
||||
},
|
||||
{
|
||||
description: "Can override all fields",
|
||||
input: map[string]interface{}{
|
||||
"programname": `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
|
||||
"container_task": "force-task",
|
||||
"container_app": "force-app",
|
||||
"container_env": "force-env",
|
||||
},
|
||||
wantContainerEnv: "force-env",
|
||||
wantContainerApp: "force-app",
|
||||
wantContainerTask: "force-task",
|
||||
},
|
||||
}
|
||||
|
||||
t.Log("Can override just 'env'")
|
||||
overrideEnv := "force-env"
|
||||
meta, err = getContainerMeta(programname, overrideEnv, "", "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": overrideEnv,
|
||||
"container_app": "app2",
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
for _, tcase := range tests {
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Log("Can override just 'app'")
|
||||
overrideApp := "force-app"
|
||||
meta, err = getContainerMeta(programname, "", overrideApp, "")
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": overrideApp,
|
||||
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
|
||||
}, meta)
|
||||
syslog := map[string]interface{}{}
|
||||
for k, v := range tcase.input {
|
||||
syslog[k] = v
|
||||
}
|
||||
|
||||
t.Log("Can override just 'task'")
|
||||
overrideTask := "force-task"
|
||||
meta, err = getContainerMeta(programname, "", "", overrideTask)
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": "env1",
|
||||
"container_app": "app2",
|
||||
"container_task": overrideTask,
|
||||
}, meta)
|
||||
newSyslog, err := addContainterMetaToSyslog(syslog)
|
||||
if tcase.wantErr {
|
||||
assert.Error(err)
|
||||
continue
|
||||
}
|
||||
for k, v := range newSyslog {
|
||||
switch k {
|
||||
case "container_env":
|
||||
assert.Equal(v, tcase.wantContainerEnv)
|
||||
case "container_app":
|
||||
assert.Equal(v, tcase.wantContainerApp)
|
||||
case "container_task":
|
||||
assert.Equal(v, tcase.wantContainerTask)
|
||||
default:
|
||||
assert.Equal(v, tcase.input[k])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("Can override all fields")
|
||||
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
|
||||
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
|
||||
assert.NoError(err)
|
||||
assert.Equal(map[string]string{
|
||||
"container_env": overrideEnv,
|
||||
"container_app": overrideApp,
|
||||
"container_task": overrideTask,
|
||||
}, meta)
|
||||
}
|
||||
|
||||
func TestExtractKVMeta(t *testing.T) {
|
||||
|
|
|
|||
22
go.mod
Normal file
22
go.mod
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
module github.com/amazon-kinesis-client-go
|
||||
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/Clever/amazon-kinesis-client-go v1.0.0
|
||||
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340
|
||||
github.com/a8m/kinesis-producer v0.2.0
|
||||
github.com/aws/aws-sdk-go v1.35.28 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 // indirect
|
||||
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d // indirect
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
|
||||
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 // indirect
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
|
||||
gopkg.in/Clever/kayvee-go.v6 v6.24.1
|
||||
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
|
||||
)
|
||||
63
go.sum
Normal file
63
go.sum
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
github.com/Clever/amazon-kinesis-client-go v1.0.0 h1:rG+hkHCSe+xMGk3asESg18tiEj8X6To8vj0TRa1pKvQ=
|
||||
github.com/Clever/amazon-kinesis-client-go v1.0.0/go.mod h1:jLfi8QusUjdsYazmEH0DlSO4gE1WRzVE7cs4c/fOFdI=
|
||||
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340 h1:wr8lTqPJZQpZkZgllQtrD96SecXkAu5MxzY3yJgsuCg=
|
||||
github.com/Clever/syslogparser v0.0.0-20170816194131-fb28ad3e4340/go.mod h1:e7Yy7RTiIMU9pZ+dcSviX3cpod8e0CEeTUPTgBbKlRE=
|
||||
github.com/a8m/kinesis-producer v0.2.0 h1:Bd5Oi4dczbTLPIZwVbm02464LIFgBqmViFj//b098xc=
|
||||
github.com/a8m/kinesis-producer v0.2.0/go.mod h1:CxoFe0Y49udKMnQPkC5S5VmZZy6a+Bef9otuoH96Pv0=
|
||||
github.com/aws/aws-sdk-go v1.35.28 h1:S2LuRnfC8X05zgZLC8gy/Sb82TGv2Cpytzbzz7tkeHc=
|
||||
github.com/aws/aws-sdk-go v1.35.28/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7 h1:FRDp/rRU2sThfssdHIUATCKJOms8R6A9rrhdC1sLqFI=
|
||||
github.com/jeromer/syslogparser v0.0.0-20190429161531-5fbaaf06d9e7/go.mod h1:mQyv/QAgjs9+PTi/iXveno+U86nKGsltjqf3ilYx4Bg=
|
||||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
|
||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d h1:ix3WmphUvN0GDd0DO9MH0v6/5xTv+Xm1bPN+1UJn58k=
|
||||
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
|
||||
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7 h1:tdnG+ZILOafvA29+pYKP3gauEbigHll3PHsf1GQa2ms=
|
||||
github.com/xeipuuv/gojsonschema v1.2.1-0.20200424115421-065759f9c3d7/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
|
||||
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/Clever/kayvee-go.v6 v6.24.1 h1:lHxjlTFj57mfrq06PiCKIAJ0QuHHEqsyvSYvzb+gSFg=
|
||||
gopkg.in/Clever/kayvee-go.v6 v6.24.1/go.mod h1:G0m6nBZj7Kdz+w2hiIaawmhXl5zp7E/K0ashol3Kb2A=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca h1:oivFrl3Vo+KfpUmTDJvz91I+BWzDPOQ+0CNR5jwTHcg=
|
||||
gopkg.in/yaml.v2 v2.3.1-0.20200602174213-b893565b90ca/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
|
||||
73
golang.mk
73
golang.mk
|
|
@ -1,15 +1,18 @@
|
|||
# This is the default Clever Golang Makefile.
|
||||
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
|
||||
# Please do not alter this file directly.
|
||||
GOLANG_MK_VERSION := 0.4.0
|
||||
GOLANG_MK_VERSION := 1.0.0
|
||||
|
||||
SHELL := /bin/bash
|
||||
SYSTEM := $(shell uname -a | cut -d" " -f1 | tr '[:upper:]' '[:lower:]')
|
||||
.PHONY: golang-test-deps bin/dep golang-ensure-curl-installed
|
||||
.PHONY: golang-test-deps golang-ensure-curl-installed
|
||||
|
||||
# set timezone to UTC for golang to match circle and deploys
|
||||
export TZ=UTC
|
||||
|
||||
# go build flags for use across all commands which accept them
|
||||
GO_BUILD_FLAGS := "-mod=vendor"
|
||||
|
||||
# if the gopath includes several directories, use only the first
|
||||
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
|
||||
|
||||
|
|
@ -32,57 +35,21 @@ _ := $(if \
|
|||
endef
|
||||
|
||||
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
|
||||
# We pin its version since its a simple tool that does its job as-is;
|
||||
# so we're defended against it breaking or changing in the future.
|
||||
FGT := $(GOPATH)/bin/fgt
|
||||
$(FGT):
|
||||
go get github.com/GeertJohan/fgt
|
||||
go get github.com/GeertJohan/fgt@262f7b11eec07dc7b147c44641236f3212fee89d
|
||||
|
||||
golang-ensure-curl-installed:
|
||||
@command -v curl >/dev/null 2>&1 || { echo >&2 "curl not installed. Please install curl."; exit 1; }
|
||||
|
||||
DEP_VERSION = v0.4.1
|
||||
DEP_INSTALLED := $(shell [[ -e "bin/dep" ]] && bin/dep version | grep version | grep -v go | cut -d: -f2 | tr -d '[:space:]')
|
||||
# Dep is a tool used to manage Golang dependencies. It is the offical vendoring experiment, but
|
||||
# not yet the official tool for Golang.
|
||||
ifeq ($(DEP_VERSION),$(DEP_INSTALLED))
|
||||
bin/dep: # nothing to do, dep is already up-to-date
|
||||
else
|
||||
CACHED_DEP = /tmp/dep-$(DEP_VERSION)
|
||||
bin/dep: golang-ensure-curl-installed
|
||||
@echo "Updating dep..."
|
||||
@mkdir -p bin
|
||||
@if [ ! -f $(CACHED_DEP) ]; then curl -o $(CACHED_DEP) -sL https://github.com/golang/dep/releases/download/$(DEP_VERSION)/dep-$(SYSTEM)-amd64; fi;
|
||||
@cp $(CACHED_DEP) bin/dep
|
||||
@chmod +x bin/dep || true
|
||||
endif
|
||||
|
||||
# figure out "github.com/<org>/<repo>"
|
||||
# `go list` will fail if there are no .go files in the directory
|
||||
# if this is the case, fall back to assuming github.com/Clever
|
||||
REF = $(shell go list || echo github.com/Clever/$(notdir $(shell pwd)))
|
||||
golang-verify-no-self-references:
|
||||
@if grep -q -i "$(REF)" Gopkg.lock; then echo "Error: Gopkg.lock includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
|
||||
@if grep -q -i "$(REF)" Gopkg.toml; then echo "Error: Gopkg.toml includes a self-reference ($(REF)), which is not allowed. See: https://github.com/golang/dep/issues/1690" && exit 1; fi;
|
||||
|
||||
golang-dep-vendor-deps: bin/dep golang-verify-no-self-references
|
||||
|
||||
# golang-godep-vendor is a target for saving dependencies with the dep tool
|
||||
# to the vendor/ directory. All nested vendor/ directories are deleted via
|
||||
# the prune command.
|
||||
# In CI, -vendor-only is used to avoid updating the lock file.
|
||||
ifndef CI
|
||||
define golang-dep-vendor
|
||||
bin/dep ensure -v
|
||||
endef
|
||||
else
|
||||
define golang-dep-vendor
|
||||
bin/dep ensure -v -vendor-only
|
||||
endef
|
||||
endif
|
||||
|
||||
# Golint is a tool for linting Golang code for common errors.
|
||||
# We pin its version because an update could add a new lint check which would make
|
||||
# previously passing tests start failing without changing our code.
|
||||
GOLINT := $(GOPATH)/bin/golint
|
||||
$(GOLINT):
|
||||
go get golang.org/x/lint/golint
|
||||
go get golang.org/x/lint/golint@738671d3881b9731cc63024d5d88cf28db875626
|
||||
|
||||
# golang-fmt-deps requires the FGT tool for checking output
|
||||
golang-fmt-deps: $(FGT)
|
||||
|
|
@ -91,7 +58,7 @@ golang-fmt-deps: $(FGT)
|
|||
# arg1: pkg path
|
||||
define golang-fmt
|
||||
@echo "FORMATTING $(1)..."
|
||||
@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go
|
||||
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); $(FGT) gofmt -l=true $${PKG_PATH}/*.go
|
||||
endef
|
||||
|
||||
# golang-lint-deps requires the golint tool for golang linting.
|
||||
|
|
@ -101,7 +68,7 @@ golang-lint-deps: $(GOLINT)
|
|||
# arg1: pkg path
|
||||
define golang-lint
|
||||
@echo "LINTING $(1)..."
|
||||
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT)
|
||||
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(GOLINT)
|
||||
endef
|
||||
|
||||
# golang-lint-deps-strict requires the golint tool for golang linting.
|
||||
|
|
@ -112,7 +79,7 @@ golang-lint-deps-strict: $(GOLINT) $(FGT)
|
|||
# arg1: pkg path
|
||||
define golang-lint-strict
|
||||
@echo "LINTING $(1)..."
|
||||
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
|
||||
@PKG_PATH=$$(go list -f '{{.Dir}}' $(1)); find $${PKG_PATH}/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
|
||||
endef
|
||||
|
||||
# golang-test-deps is here for consistency
|
||||
|
|
@ -122,7 +89,7 @@ golang-test-deps:
|
|||
# arg1: pkg path
|
||||
define golang-test
|
||||
@echo "TESTING $(1)..."
|
||||
@go test -v $(1)
|
||||
@go test $(GO_BUILD_FLAGS) -v $(1)
|
||||
endef
|
||||
|
||||
# golang-test-strict-deps is here for consistency
|
||||
|
|
@ -132,7 +99,7 @@ golang-test-strict-deps:
|
|||
# arg1: pkg path
|
||||
define golang-test-strict
|
||||
@echo "TESTING $(1)..."
|
||||
@go test -v -race $(1)
|
||||
@go test -v $(GO_BUILD_FLAGS) -race $(1)
|
||||
endef
|
||||
|
||||
# golang-vet-deps is here for consistency
|
||||
|
|
@ -142,7 +109,7 @@ golang-vet-deps:
|
|||
# arg1: pkg path
|
||||
define golang-vet
|
||||
@echo "VETTING $(1)..."
|
||||
@go vet $(GOPATH)/src/$(1)/*.go
|
||||
@go vet $(GO_BUILD_FLAGS) $(1)
|
||||
endef
|
||||
|
||||
# golang-test-all-deps installs all dependencies needed for different test cases.
|
||||
|
|
@ -176,14 +143,14 @@ endef
|
|||
define golang-build
|
||||
@echo "BUILDING..."
|
||||
@if [ -z "$$CI" ]; then \
|
||||
go build -o bin/$(2) $(1); \
|
||||
go build $(GO_BUILD_FLAGS) -o bin/$(2) $(1); \
|
||||
else \
|
||||
echo "-> Building CGO binary"; \
|
||||
CGO_ENABLED=0 go build -installsuffix cgo -o bin/$(2) $(1); \
|
||||
CGO_ENABLED=0 go build $(GO_BUILD_FLAGS) -installsuffix cgo -o bin/$(2) $(1); \
|
||||
fi;
|
||||
endef
|
||||
|
||||
# golang-update-makefile downloads latest version of golang.mk
|
||||
golang-update-makefile:
|
||||
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null
|
||||
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang-v1.mk -O /tmp/golang.mk 2>/dev/null
|
||||
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi
|
||||
|
|
|
|||
|
|
@ -1,16 +1,150 @@
|
|||
// Package splitter provides functions for decoding various kinds of records that might come off of a kinesis stream.
|
||||
// It is equipped to with the functions to unbundle KPL aggregates and CloudWatch log bundles,
|
||||
// as well as apply appropriate decompression.
|
||||
// KCL applications would be most interested in `SplitMessageIfNecessary` which can handle zlibbed records as well as
|
||||
// CloudWatch bundles. KCL automatically unbundles KPL aggregates before passing the records to the consumer.
|
||||
// Non-KCL applications (such as Lambdas consuming KPL-produced aggregates) should either use
|
||||
// - KPLDeaggregate if the consumer purely wants to unbundle KPL aggregates, but will handle the raw records themselves.
|
||||
// - Deaggregate if the consumer wants to apply the same decompress and split logic as SplitMessageIfNecessary
|
||||
// in addition to the KPL splitting.
|
||||
package splitter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"compress/zlib"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
kpl "github.com/a8m/kinesis-producer"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a single
|
||||
// Amazon Kinesis record for efficient puts.
|
||||
// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
|
||||
var kplMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
|
||||
|
||||
// IsKPLAggregate checks a record for a KPL aggregate prefix.
|
||||
// It is not necessary to call this before calling KPLDeaggregate.
|
||||
func IsKPLAggregate(data []byte) bool {
|
||||
return bytes.HasPrefix(data, kplMagicNumber)
|
||||
}
|
||||
|
||||
// KPLDeaggregate takes a Kinesis record and converts it to one or more user records by applying KPL deaggregation.
|
||||
// If the record begins with the 4-byte magic prefix that KPL uses, the single Kinesis record is split into its component user records.
|
||||
// Otherwise, the return value is a singleton containing the original record.
|
||||
func KPLDeaggregate(kinesisRecord []byte) ([][]byte, error) {
|
||||
if !IsKPLAggregate(kinesisRecord) {
|
||||
return [][]byte{kinesisRecord}, nil
|
||||
}
|
||||
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
|
||||
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
|
||||
recordSum := md5.Sum(src)
|
||||
for i, b := range checksum {
|
||||
if b != recordSum[i] {
|
||||
// either the data is corrupted or this is not a KPL aggregate
|
||||
// either way, return the data as-is
|
||||
return [][]byte{kinesisRecord}, nil
|
||||
}
|
||||
}
|
||||
dest := new(kpl.AggregatedRecord)
|
||||
err := proto.Unmarshal(src, dest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshalling proto: %v", err)
|
||||
}
|
||||
var records [][]byte
|
||||
for _, userRecord := range dest.GetRecords() {
|
||||
records = append(records, userRecord.Data)
|
||||
}
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// Deaggregate is a combination of KPLDeaggregate and SplitMessageIfNecessary
|
||||
// First it tries to KPL-deaggregate. If unsuccessful, it calls SplitIfNecessary on the original record.
|
||||
// If successful, it iterates over the individual user records and attempts to unzlib them.
|
||||
// If a record inside an aggregate is in zlib format, the output will contain the unzlibbed version.
|
||||
// If it is not zlibbed, the output will contain the record verbatim
|
||||
// A similar result can be optained by calling KPLDeaggregate, then iterating over the results and callin SplitMessageIfNecessary.
|
||||
// This function makes the assumption that after KPL-deaggregating, the results are not CloudWatch aggregates, so it doesn't need to check them for a gzip header.
|
||||
// Also it lets us iterate over the user records one less time, since KPLDeaggregate loops over the records and we would need to loop again to unzlib.
|
||||
//
|
||||
// See the SplitMessageIfNecessary documentation for the format of output for CloudWatch log bundles.
|
||||
func Deaggregate(kinesisRecord []byte) ([][]byte, error) {
|
||||
if !IsKPLAggregate(kinesisRecord) {
|
||||
return SplitMessageIfNecessary(kinesisRecord)
|
||||
}
|
||||
src := kinesisRecord[len(kplMagicNumber) : len(kinesisRecord)-md5.Size]
|
||||
checksum := kinesisRecord[len(kinesisRecord)-md5.Size:]
|
||||
recordSum := md5.Sum(src)
|
||||
for i, b := range checksum {
|
||||
if b != recordSum[i] {
|
||||
// either the data is corrupted or this is not a KPL aggregate
|
||||
// either way, return the data as-is
|
||||
return [][]byte{kinesisRecord}, nil
|
||||
}
|
||||
}
|
||||
dest := new(kpl.AggregatedRecord)
|
||||
err := proto.Unmarshal(src, dest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshalling proto: %v", err)
|
||||
}
|
||||
var records [][]byte
|
||||
for _, userRecord := range dest.GetRecords() {
|
||||
record, err := unzlib(userRecord.Data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unzlibbing record: %w", err)
|
||||
}
|
||||
records = append(records, record)
|
||||
}
|
||||
return records, nil
|
||||
}
|
||||
|
||||
// SplitMessageIfNecessary recieves a user-record and returns a slice of one or more records.
|
||||
// if the record is coming off of a kinesis stream and might be KPL aggregated, it needs to be deaggregated before calling this.
|
||||
// This function handles three types of records:
|
||||
// - records emitted from CWLogs Subscription (which are gzip compressed)
|
||||
// - zlib compressed records (e.g. as compressed and emitted by Kinesis plugin for Fluent Bit
|
||||
// - any other record (left unchanged)
|
||||
//
|
||||
// CloudWatch logs come as structured JSON. In the process of splitting, they are converted
|
||||
// into an rsyslog format that allows fairly uniform parsing of the result across the
|
||||
// AWS services that might emit logs to CloudWatch.
|
||||
// Note that these timezone used in these syslog records is guessed based on the local env.
|
||||
// If you need consistent timezones, set TZ=UTC in your environment.
|
||||
func SplitMessageIfNecessary(userRecord []byte) ([][]byte, error) {
|
||||
// First try the record as a CWLogs record
|
||||
if IsGzipped(userRecord) {
|
||||
return GetMessagesFromGzippedInput(userRecord)
|
||||
}
|
||||
|
||||
unzlibRecord, err := unzlib(userRecord)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Process a single message, from KPL
|
||||
return [][]byte{unzlibRecord}, nil
|
||||
}
|
||||
|
||||
func unzlib(input []byte) ([]byte, error) {
|
||||
zlibReader, err := zlib.NewReader(bytes.NewReader(input))
|
||||
if err == nil {
|
||||
unzlibRecord, err := ioutil.ReadAll(zlibReader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading zlib-compressed record: %v", err)
|
||||
}
|
||||
return unzlibRecord, nil
|
||||
}
|
||||
return input, nil
|
||||
|
||||
}
|
||||
|
||||
// LogEvent is a single log line within a LogEventBatch
|
||||
type LogEvent struct {
|
||||
ID string `json:"id"`
|
||||
|
|
@ -116,6 +250,11 @@ var awsFargateLogStreamRegex = regexp.MustCompile(`^fargate/([a-z0-9-]+)--([a-z0
|
|||
// RDS slowquery log groups are in the form of /aws/rds/cluster/<database name>/slowquery
|
||||
var awsRDSLogGroupRegex = regexp.MustCompile(`^/aws/rds/cluster/([a-z0-9-]+)/slowquery$`)
|
||||
|
||||
// glue log groups are of the form /aws-glue/jobs/<env>/<app>/<pod-id>
|
||||
// glue log streams are of the form <job id>-<"driver" | "1" | "progress-bar">
|
||||
var awsGlueLogGroupRegex = regexp.MustCompile(`^/aws-glue/jobs/([a-z0-9-]+)/([a-z0-9-]+)/([a-z0-9-]+)$`)
|
||||
var awsGlueLogStreamRegex = regexp.MustCompile(`^(jr_[a-z0-9-]+)-.*$`)
|
||||
|
||||
// arn and task cruft to satisfy parsing later on: https://github.com/Clever/amazon-kinesis-client-go/blob/94aacdf8339bd2cc8400d3bcb323dc1bce2c8422/decode/decode.go#L421-L425
|
||||
const arnCruft = `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F`
|
||||
const taskCruft = `12345678-1234-1234-1234-555566667777`
|
||||
|
|
@ -228,6 +367,32 @@ func splitAWSRDS(b LogEventBatch) ([]RSysLogMessage, bool) {
|
|||
return out, true
|
||||
}
|
||||
|
||||
func splitAWSGlue(b LogEventBatch) ([]RSysLogMessage, bool) {
|
||||
matches := awsGlueLogGroupRegex.FindAllStringSubmatch(b.LogGroup, 1)
|
||||
if len(matches) != 1 {
|
||||
return nil, false
|
||||
}
|
||||
env := matches[0][1]
|
||||
app := matches[0][2]
|
||||
|
||||
streamMatches := awsGlueLogStreamRegex.FindAllStringSubmatch(b.LogStream, 1)
|
||||
if len(streamMatches) != 1 {
|
||||
return nil, false
|
||||
}
|
||||
jobID := streamMatches[0][1]
|
||||
|
||||
out := []RSysLogMessage{}
|
||||
for _, event := range b.LogEvents {
|
||||
out = append(out, RSysLogMessage{
|
||||
Timestamp: event.Timestamp.Time(),
|
||||
ProgramName: env + "--" + app + arnCruft + jobID,
|
||||
Hostname: "aws-glue",
|
||||
Message: event.Message,
|
||||
})
|
||||
}
|
||||
return out, true
|
||||
}
|
||||
|
||||
func splitDefault(b LogEventBatch) []RSysLogMessage {
|
||||
out := []RSysLogMessage{}
|
||||
for _, event := range b.LogEvents {
|
||||
|
|
@ -261,6 +426,8 @@ func Split(b LogEventBatch) [][]byte {
|
|||
return stringify(rsyslogMsgs)
|
||||
} else if rsyslogMsgs, ok := splitAWSRDS(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
} else if rsyslogMsgs, ok := splitAWSGlue(b); ok {
|
||||
return stringify(rsyslogMsgs)
|
||||
}
|
||||
return stringify(splitDefault(b))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,15 +3,29 @@ package splitter
|
|||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"compress/zlib"
|
||||
"crypto/md5"
|
||||
b64 "encoding/base64"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Clever/amazon-kinesis-client-go/decode"
|
||||
kpl "github.com/a8m/kinesis-producer"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// In the conversion of CloudWatch LogEvent struct to an RSyslog struct to a string,
|
||||
// the timezone used in the final string depends on the locally set timezone.
|
||||
// in order for tests to pass, we set TZ to UTC
|
||||
os.Setenv("TZ", "UTC")
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestUnpacking(t *testing.T) {
|
||||
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
|
||||
|
||||
|
|
@ -253,3 +267,290 @@ func TestSplitRDS(t *testing.T) {
|
|||
assert.Equal(t, "Slow query: select * from table.", enhanced["rawlog"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitGlue(t *testing.T) {
|
||||
input := LogEventBatch{
|
||||
MessageType: "DATA_MESSAGE",
|
||||
Owner: "123456789012",
|
||||
LogGroup: "/aws-glue/jobs/clever-dev/analytics-district-participation/aae75f00",
|
||||
LogStream: "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28-1",
|
||||
SubscriptionFilters: []string{"ForwardLogsToKinesis"},
|
||||
LogEvents: []LogEvent{
|
||||
{
|
||||
ID: "99999992379011144044923130086453437181614530551221780480",
|
||||
Timestamp: NewUnixTimestampMillis(1498519943285),
|
||||
Message: "foo bar.",
|
||||
},
|
||||
},
|
||||
}
|
||||
lines := Split(input)
|
||||
expected := [][]byte{
|
||||
[]byte(`2017-06-26T23:32:23.285001+00:00 aws-glue clever-dev--analytics-district-participation/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2Fjr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28: foo bar.`),
|
||||
}
|
||||
assert.Equal(t, expected, lines)
|
||||
for _, line := range expected {
|
||||
enhanced, err := decode.ParseAndEnhance(string(line), "")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, "aws-glue", enhanced["hostname"])
|
||||
assert.Equal(t, "clever-dev", enhanced["container_env"])
|
||||
assert.Equal(t, "analytics-district-participation", enhanced["container_app"])
|
||||
assert.Equal(t, "jr_8927660fecacbe026ccab656cb80befea8102ac2023df531b92889b112aada28", enhanced["container_task"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitIfNecesary(t *testing.T) {
|
||||
|
||||
// We provide three different inputs to batchedWriter.splitMessageIfNecessary
|
||||
// plain text
|
||||
// zlib compressed text
|
||||
// gzip compressed CloudWatch logs batch
|
||||
// we verify that the split function matches the input against the correct splitter
|
||||
// and decodes it.
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
plainTextInput := []byte("hello, world!")
|
||||
|
||||
records, err := SplitMessageIfNecessary(plainTextInput)
|
||||
assert.NoError(err)
|
||||
assert.Equal(
|
||||
records,
|
||||
[][]byte{[]byte("hello, world!")},
|
||||
)
|
||||
|
||||
var z bytes.Buffer
|
||||
zbuf := zlib.NewWriter(&z)
|
||||
zbuf.Write([]byte("hello, world!"))
|
||||
zbuf.Close()
|
||||
zlibSingleInput := z.Bytes()
|
||||
|
||||
records, err = SplitMessageIfNecessary(zlibSingleInput)
|
||||
assert.NoError(err)
|
||||
assert.Equal(
|
||||
records,
|
||||
[][]byte{[]byte("hello, world!")},
|
||||
)
|
||||
|
||||
// the details of this part aren't super important since the actual functionality is
|
||||
// tested in other tests; for this test we just want to make sure that split function
|
||||
// correctly realizes it's gzip and call the appropriate CW-log-splitting logic
|
||||
var g bytes.Buffer
|
||||
gbuf := gzip.NewWriter(&g)
|
||||
cwLogBatch := LogEventBatch{
|
||||
MessageType: "test",
|
||||
Owner: "test",
|
||||
LogGroup: "test",
|
||||
LogStream: "test",
|
||||
SubscriptionFilters: []string{""},
|
||||
LogEvents: []LogEvent{{
|
||||
ID: "test",
|
||||
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
|
||||
Message: "test",
|
||||
}},
|
||||
}
|
||||
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
|
||||
gbuf.Write(cwLogBatchJSON)
|
||||
gbuf.Close()
|
||||
gzipBatchInput := g.Bytes()
|
||||
|
||||
expectedRecord := []byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")
|
||||
records, err = SplitMessageIfNecessary(gzipBatchInput)
|
||||
assert.NoError(err)
|
||||
assert.Equal(
|
||||
records,
|
||||
[][]byte{expectedRecord},
|
||||
)
|
||||
}
|
||||
|
||||
func createKPLAggregate(input [][]byte, compress bool) []byte {
|
||||
var partitionKeyIndex uint64 = 0
|
||||
|
||||
records := []*kpl.Record{}
|
||||
for _, log := range input {
|
||||
if compress {
|
||||
var z bytes.Buffer
|
||||
zbuf := zlib.NewWriter(&z)
|
||||
zbuf.Write(log)
|
||||
zbuf.Close()
|
||||
log = z.Bytes()
|
||||
}
|
||||
records = append(records, &kpl.Record{
|
||||
PartitionKeyIndex: &partitionKeyIndex,
|
||||
Data: log,
|
||||
})
|
||||
}
|
||||
|
||||
logProto, err := proto.Marshal(&kpl.AggregatedRecord{
|
||||
PartitionKeyTable: []string{"ecs_task_arn"},
|
||||
Records: records,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log := append(kplMagicNumber, logProto...)
|
||||
logHash := md5.Sum(logProto)
|
||||
return append(log, logHash[0:16]...)
|
||||
}
|
||||
|
||||
func TestKPLDeaggregate(t *testing.T) {
|
||||
type test struct {
|
||||
description string
|
||||
input []byte
|
||||
output [][]byte
|
||||
shouldError bool
|
||||
}
|
||||
|
||||
tests := []test{
|
||||
{
|
||||
description: "non-aggregated record",
|
||||
input: []byte("hello"),
|
||||
output: [][]byte{[]byte("hello")},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "one kpl-aggregated record",
|
||||
input: createKPLAggregate(
|
||||
[][]byte{[]byte("hello")},
|
||||
false,
|
||||
),
|
||||
output: [][]byte{[]byte("hello")},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "three kpl-aggregated record",
|
||||
input: createKPLAggregate([][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
false,
|
||||
),
|
||||
output: [][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
out, err := KPLDeaggregate(tt.input)
|
||||
if tt.shouldError {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, out, tt.output)
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeaggregate(t *testing.T) {
|
||||
type test struct {
|
||||
description string
|
||||
input []byte
|
||||
output [][]byte
|
||||
shouldError bool
|
||||
}
|
||||
|
||||
tests := []test{
|
||||
{
|
||||
description: "non-aggregated record",
|
||||
input: []byte("hello"),
|
||||
output: [][]byte{[]byte("hello")},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "one kpl-aggregated record",
|
||||
input: createKPLAggregate(
|
||||
[][]byte{[]byte("hello")},
|
||||
false,
|
||||
),
|
||||
output: [][]byte{[]byte("hello")},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "three kpl-aggregated record",
|
||||
input: createKPLAggregate([][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
false,
|
||||
),
|
||||
output: [][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "one kpl-aggregated zlib record",
|
||||
input: createKPLAggregate(
|
||||
[][]byte{[]byte("hello")},
|
||||
true,
|
||||
),
|
||||
output: [][]byte{[]byte("hello")},
|
||||
shouldError: false,
|
||||
},
|
||||
{
|
||||
description: "three kpl-aggregated zlib record",
|
||||
input: createKPLAggregate([][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
true,
|
||||
),
|
||||
output: [][]byte{
|
||||
[]byte("hello, "),
|
||||
[]byte("world"),
|
||||
[]byte("!"),
|
||||
},
|
||||
shouldError: false,
|
||||
},
|
||||
}
|
||||
|
||||
var g bytes.Buffer
|
||||
gbuf := gzip.NewWriter(&g)
|
||||
cwLogBatch := LogEventBatch{
|
||||
MessageType: "test",
|
||||
Owner: "test",
|
||||
LogGroup: "test",
|
||||
LogStream: "test",
|
||||
SubscriptionFilters: []string{""},
|
||||
LogEvents: []LogEvent{{
|
||||
ID: "test",
|
||||
Timestamp: UnixTimestampMillis(time.Date(2020, time.September, 9, 9, 10, 10, 0, time.UTC)),
|
||||
Message: "test",
|
||||
}},
|
||||
}
|
||||
cwLogBatchJSON, _ := json.Marshal(cwLogBatch)
|
||||
gbuf.Write(cwLogBatchJSON)
|
||||
gbuf.Close()
|
||||
gzipBatchInput := g.Bytes()
|
||||
|
||||
tests = append(tests, test{
|
||||
description: "cloudwatch log batch",
|
||||
input: gzipBatchInput,
|
||||
output: [][]byte{[]byte("2020-09-09T09:10:10.000001+00:00 test test--test/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777: test")},
|
||||
shouldError: false,
|
||||
})
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
out, err := Deaggregate(tt.input)
|
||||
|
||||
if tt.shouldError {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, out, tt.output)
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue