Implemented new consumer interface as well as example consumer: 'batchconsumer'

This commit is contained in:
Xavi Ramirez 2017-07-18 02:03:15 +00:00
parent 92eaacfbf1
commit 4c67f39c99
17 changed files with 2631 additions and 3 deletions

View file

@ -30,6 +30,8 @@ EMPTY :=
SPACE := $(EMPTY) $(EMPTY)
JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD))
CONSUMER ?= consumer
$(JARS_TO_DOWNLOAD):
mkdir -p `dirname $@`
curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'`
@ -37,8 +39,13 @@ $(JARS_TO_DOWNLOAD):
download_jars: $(JARS_TO_DOWNLOAD)
build:
CGO_ENABLED=0 go build -installsuffix cgo -o build/consumer $(PKG)/cmd/consumer
CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER)
run: build download_jars
command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; }
java -cp $(JAVA_CLASS_PATH) com.amazonaws.services.kinesis.multilang.MultiLangDaemon consumer.properties
java -cp $(JAVA_CLASS_PATH) \
com.amazonaws.services.kinesis.multilang.MultiLangDaemon \
$(CONSUMER).properties
bench:
go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/

83
batchconsumer.properties Normal file
View file

@ -0,0 +1,83 @@
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = build/batchconsumer
# The name of an Amazon Kinesis stream to process.
streamName = test-stream
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = KCLGoExample
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = golang
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# The KCL defaults to us-east-1
regionName = us-west-1
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000
# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0

View file

@ -0,0 +1,165 @@
package batcher
import (
"fmt"
"math/big"
"sync"
"time"
)
type SequencePair struct {
Sequence *big.Int
SubSequence int
}
// Sync is used to allow a writer to syncronize with the batcher.
// The writer declares how to write messages (via its `SendBatch` method), while the batcher
// keeps track of messages written
type Sync interface {
SendBatch(batch [][]byte)
}
// Batcher interface
type Batcher interface {
// AddMesage to the batch
AddMessage(msg []byte, sequencePair SequencePair) error
// Flush all messages from the batch
Flush()
// SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in
// the current batch
SmallestSequencePair() SequencePair
}
type msgPack struct {
msg []byte
sequencePair SequencePair
}
type batcher struct {
mux sync.Mutex
flushInterval time.Duration
flushCount int
flushSize int
// smallestSeq and smallestSubSeq are used to track the highest sequence number
// of any record in the batch. This is used for checkpointing.
smallestSeq SequencePair
sync Sync
msgChan chan<- msgPack
flushChan chan<- struct{}
}
// New creates a new Batcher
// - sync - synchronizes batcher with writer
// - flushInterval - how often accumulated messages should be flushed (default 1 second).
// - flushCount - number of messages that trigger a flush (default 10).
// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb)
func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) Batcher {
msgChan := make(chan msgPack, 100)
flushChan := make(chan struct{})
b := &batcher{
flushCount: flushCount,
flushInterval: flushInterval,
flushSize: flushSize,
sync: sync,
msgChan: msgChan,
flushChan: flushChan,
}
go b.startBatcher(msgChan, flushChan)
return b
}
func (b *batcher) SmallestSequencePair() SequencePair {
b.mux.Lock()
defer b.mux.Unlock()
return b.smallestSeq
}
func (b *batcher) SetFlushInterval(dur time.Duration) {
b.flushInterval = dur
}
func (b *batcher) SetFlushCount(count int) {
b.flushCount = count
}
func (b *batcher) SetFlushSize(size int) {
b.flushSize = size
}
func (b *batcher) AddMessage(msg []byte, pair SequencePair) error {
if len(msg) <= 0 {
return fmt.Errorf("Empty messages can't be sent")
}
b.msgChan <- msgPack{msg, pair}
return nil
}
// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch.
// When flush() is called, the batcher sends the sequence number to the writer. When the writer
// checkpoints, it does so up to the latest message that was flushed successfully.
func (b *batcher) updateSequenceNumbers(pair SequencePair) {
b.mux.Lock()
defer b.mux.Unlock()
isSmaller := b.smallestSeq.Sequence == nil ||
pair.Sequence.Cmp(b.smallestSeq.Sequence) == -1 ||
(pair.Sequence.Cmp(b.smallestSeq.Sequence) == 0 &&
pair.SubSequence < b.smallestSeq.SubSequence)
if isSmaller {
b.smallestSeq = SequencePair{pair.Sequence, pair.SubSequence}
}
}
func (b *batcher) Flush() {
b.flushChan <- struct{}{}
}
func (b *batcher) batchSize(batch [][]byte) int {
total := 0
for _, msg := range batch {
total += len(msg)
}
return total
}
func (b *batcher) flush(batch [][]byte) [][]byte {
if len(batch) > 0 {
b.sync.SendBatch(batch)
b.smallestSeq = SequencePair{nil, 0}
}
return [][]byte{}
}
func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) {
batch := [][]byte{}
for {
select {
case <-time.After(b.flushInterval):
batch = b.flush(batch)
case <-flushChan:
batch = b.flush(batch)
case pack := <-msgChan:
size := b.batchSize(batch)
if b.flushSize < size+len(pack.msg) {
batch = b.flush(batch)
}
batch = append(batch, pack.msg)
b.updateSequenceNumbers(pack.sequencePair)
if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) {
batch = b.flush(batch)
}
}
}
}

View file

@ -0,0 +1,212 @@
package batcher
import (
"fmt"
"math/big"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type batch [][]byte
type MockSync struct {
flushChan chan struct{}
batches []batch
}
func NewMockSync() *MockSync {
return &MockSync{
flushChan: make(chan struct{}, 1),
batches: []batch{},
}
}
func (m *MockSync) SendBatch(b [][]byte) {
m.batches = append(m.batches, batch(b))
m.flushChan <- struct{}{}
}
func (m *MockSync) waitForFlush(timeout time.Duration) error {
select {
case <-m.flushChan:
return nil
case <-time.After(timeout):
return fmt.Errorf("timed out before flush (waited %s)", timeout.String())
}
}
const mockSequenceNumber = "99999"
const mockSubSequenceNumber = 12345
func TestBatchingByCount(t *testing.T) {
var err error
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Hour, 2, 1024*1024)
t.Log("Batcher respect count limit")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(2, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
assert.Equal("heyhey", string(sync.batches[0][1]))
t.Log("Batcher doesn't send partial batches")
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestBatchingByTime(t *testing.T) {
var err error
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Millisecond, 2000000, 1024*1024)
t.Log("Batcher sends partial batches when time expires")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
t.Log("Batcher sends all messsages in partial batches when time expires")
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(2, len(sync.batches))
assert.Equal(2, len(sync.batches[1]))
assert.Equal("heyhey", string(sync.batches[1][0]))
assert.Equal("yoyo", string(sync.batches[1][1]))
t.Log("Batcher doesn't send empty batches")
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestBatchingBySize(t *testing.T) {
var err error
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Hour, 2000000, 8)
t.Log("Large messages are sent immediately")
assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hellohello", string(sync.batches[0][0]))
t.Log("Batcher tries not to exceed size limit")
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber))
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(2, len(sync.batches))
assert.Equal(1, len(sync.batches[1]))
assert.Equal("heyhey", string(sync.batches[1][0]))
t.Log("Batcher sends messages that didn't fit in previous batch")
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) // At this point "hihi" is in the batch
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(3, len(sync.batches))
assert.Equal(2, len(sync.batches[2]))
assert.Equal("hihi", string(sync.batches[2][0]))
assert.Equal("yoyo", string(sync.batches[2][1]))
t.Log("Batcher doesn't send partial batches")
assert.NoError(batcher.AddMessage([]byte("okok"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestFlushing(t *testing.T) {
var err error
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Hour, 2000000, 1024*1024)
t.Log("Calling flush sends pending messages")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber))
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
batcher.Flush()
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
}
func TestSendingEmpty(t *testing.T) {
var err error
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Second, 10, 1024*1024)
t.Log("An error is returned when an empty message is sent")
err = batcher.AddMessage([]byte{}, mockSequenceNumber, mockSubSequenceNumber)
assert.Error(err)
}
func TestUpdatingSequence(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher := New(sync, time.Second, 10, 1024*1024).(*batcher)
t.Log("Initally, smallestSeq is undefined")
expected := new(big.Int)
assert.Nil(batcher.smallestSeq.Sequence)
t.Log("After AddMessage (seq=1), smallestSeq = 1")
assert.NoError(batcher.AddMessage([]byte("abab"), "1", mockSubSequenceNumber))
sync.waitForFlush(time.Minute)
expected.SetInt64(1)
seq := batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
assert.NoError(batcher.AddMessage([]byte("cdcd"), "2", mockSubSequenceNumber))
sync.waitForFlush(time.Minute)
seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=1), smallestSeq = 0")
assert.NoError(batcher.AddMessage([]byte("efef"), "0", mockSubSequenceNumber))
sync.waitForFlush(time.Minute)
expected.SetInt64(0)
seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
}

117
batchconsumer/consumer.go Normal file
View file

@ -0,0 +1,117 @@
package batchconsumer
import (
"io"
"log"
"os"
"time"
"gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type Config struct {
// LogFile where consumer errors and failed log lines are saved
LogFile string
// FlushInterval is how often accumulated messages should be bulk put to firehose
FlushInterval time.Duration
// FlushCount is the number of messages that triggers a push to firehose. Max batch size is 500, see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html
FlushCount int
// FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html
FlushSize int
// DeployEnv the name of the deployment enviornment
DeployEnv string
// ReadRateLimit the number of records read per seconds
ReadRateLimit int
// ReadBurstLimit the max number of tokens allowed by rate limiter
ReadBurstLimit int
// CheckpointFreq the frequence in which a checkpoint is saved
CheckpointFreq time.Duration
// CheckpointRetries the number of times the consumer will try to save a checkpoint
CheckpointRetries int
// CheckpointRetrySleep the amount of time between checkpoint save attempts
CheckpointRetrySleep time.Duration
logOutput io.Writer
}
type BatchConsumer struct {
kclProcess *kcl.KCLProcess
}
func withDefaults(config Config) Config {
if config.LogFile == "" {
config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339)
}
if config.FlushInterval == 0 {
config.FlushInterval = 10 * time.Second
}
if config.FlushCount == 0 {
config.FlushCount = 500
}
if config.FlushSize == 0 {
config.FlushSize = 4 * 1024 * 1024
}
if config.DeployEnv == "" {
config.DeployEnv = "unknown-env"
}
if config.ReadRateLimit == 0 {
config.ReadRateLimit = 300
}
if config.ReadRateLimit == 0 {
config.ReadRateLimit = int(300 * 1.2)
}
if config.CheckpointFreq == 0 {
config.CheckpointFreq = 60 * time.Second
}
if config.CheckpointRetries == 0 {
config.CheckpointRetries = 5
}
if config.CheckpointRetrySleep == 0 {
config.CheckpointRetrySleep = 5 * time.Second
}
return config
}
func NewBatchConsumerFromFiles(
config Config, sender Sender, input io.Reader, output, errFile io.Writer,
) *BatchConsumer {
config = withDefaults(config)
file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error())
}
defer file.Close()
kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file)
wrt := &BatchedWriter{
config: config,
log: kvlog,
sender: sender,
}
kclProcess := kcl.New(input, output, errFile, wrt)
return &BatchConsumer{
kclProcess: kclProcess,
}
}
func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr)
}
func (b *BatchConsumer) Start() {
b.kclProcess.Run()
}

30
batchconsumer/sender.go Normal file
View file

@ -0,0 +1,30 @@
package batchconsumer
import (
"errors"
"fmt"
)
var ErrLogIgnored = errors.New("Log intentionally skipped by sender")
type Sender interface {
EncodeLog(rawlog []byte) (log []byte, tags []string, err error)
SendBatch(batch [][]byte, tag string) error
}
type PartialOutputError struct {
Message string
Logs [][]byte
}
func (c PartialOutputError) Error() string {
return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message)
}
type CatastrophicOutputError struct {
Message string
}
func (c CatastrophicOutputError) Error() string {
return c.Message
}

11
batchconsumer/sync.go Normal file
View file

@ -0,0 +1,11 @@
package batchconsumer
type BatcherSync struct {
tag string
writer *BatchedWriter
}
func (b *BatcherSync) SendBatch(batch [][]byte) {
b.writer.SendBatch(batch, b.tag)
b.writer.CheckPointBatch(b.tag)
}

250
batchconsumer/writer.go Normal file
View file

@ -0,0 +1,250 @@
package batchconsumer
import (
"context"
"encoding/base64"
"fmt"
"math/big"
"os"
"time"
"golang.org/x/time/rate"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher"
"github.com/Clever/amazon-kinesis-client-go/kcl"
"github.com/Clever/amazon-kinesis-client-go/splitter"
)
type BatchedWriter struct {
config Config
sender Sender
log kv.KayveeLogger
shardID string
checkpointChan chan batcher.SequencePair
// Limits the number of records read from the stream
rateLimiter *rate.Limiter
batchers map[string]batcher.Batcher
lastProcessedSeq batcher.SequencePair
}
func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
b.batchers = map[string]batcher.Batcher{}
b.shardID = shardID
b.checkpointChan = make(chan batcher.SequencePair)
b.rateLimiter = rate.NewLimiter(rate.Limit(b.config.ReadRateLimit), b.config.ReadBurstLimit)
b.startCheckpointListener(checkpointer, b.checkpointChan)
return nil
}
// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise.
func (b *BatchedWriter) handleCheckpointError(err error) bool {
if err == nil {
return false
}
cperr, ok := err.(kcl.CheckpointError)
if !ok {
b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID})
return true
}
switch cperr.Error() {
case "ShutdownException": // Skips checkpointing
b.log.ErrorD("shutdown-checkpoint-exception", kv.M{
"msg": err.Error(), "shard-id": b.shardID,
})
return false
case "ThrottlingException":
b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID})
case "InvalidStateException":
b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID})
default:
b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err})
}
return true
}
func (b *BatchedWriter) startCheckpointListener(
checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair,
) {
lastCheckpoint := time.Now()
go func() {
for {
seq := <-checkpointChan
// This is a write throttle to ensure we don't checkpoint faster than
// b.config.CheckpointFreq. The latest seq number is always used.
for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
select {
case seq = <-checkpointChan: // Keep updating checkpoint seq while waiting
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
}
}
retry := true
for n := 0; retry && n < b.config.CheckpointRetries+1; n++ {
str := seq.Sequence.String()
err := checkpointer.Checkpoint(&str, &seq.SubSequence)
if err == nil { // Successfully checkpointed!
lastCheckpoint = time.Now()
break
}
retry = b.handleCheckpointError(err)
if n == b.config.CheckpointRetries {
b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries})
retry = false
}
if retry {
time.Sleep(b.config.CheckpointRetrySleep)
}
}
}
}()
}
func (b *BatchedWriter) createBatcher(tag string) batcher.Batcher {
sync := &BatcherSync{
tag: tag,
writer: b,
}
return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize)
}
func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records:
// - records emitted from CWLogs Subscription
// - records emiited from KPL
if !splitter.IsGzipped(record) {
// Process a single message, from KPL
return [][]byte{record}, nil
}
// Process a batch of messages from a CWLogs Subscription
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
}
func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error {
curSequence := b.lastProcessedSeq
for _, record := range records {
// Wait until rate limiter permits one more record to be processed
b.rateLimiter.Wait(context.Background())
seq := new(big.Int)
if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
}
b.lastProcessedSeq = curSequence // Updated with the value from the previous iteration
curSequence = batcher.SequencePair{seq, record.SubSequenceNumber}
data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
return err
}
rawlogs, err := b.splitMessageIfNecessary(data)
if err != nil {
return err
}
for _, rawlog := range rawlogs {
log, tags, err := b.sender.EncodeLog(rawlog)
if err == ErrLogIgnored {
continue // Skip message
} else if err != nil {
return err
}
if len(tags) == 0 {
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawlog))
}
for _, tag := range tags {
batcher, ok := b.batchers[tag]
if !ok {
batcher = b.createBatcher(tag)
b.batchers[tag] = batcher
}
// Use second to last sequence number to ensure we don't checkpoint a message before
// it's been sent. When batches are sent, conceptually we first find the smallest
// sequence number amount all the batch (let's call it A). We then checkpoint at
// the A-1 sequence number.
err = batcher.AddMessage(log, b.lastProcessedSeq)
if err != nil {
return err
}
}
}
}
b.lastProcessedSeq = curSequence
return nil
}
func (b *BatchedWriter) CheckPointBatch(tag string) {
smallest := b.lastProcessedSeq
for name, batch := range b.batchers {
if tag == name {
continue
}
pair := batch.SmallestSequencePair()
if pair.Sequence == nil { // Occurs when batch has no items
continue
}
isSmaller := smallest.Sequence == nil || // smallest.Sequence means batch just flushed
pair.Sequence.Cmp(smallest.Sequence) == -1 ||
(pair.Sequence.Cmp(smallest.Sequence) == 0 && pair.SubSequence < smallest.SubSequence)
if isSmaller {
smallest = pair
}
}
b.checkpointChan <- smallest
}
func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) {
b.log.Info("sent-batch")
err := b.sender.SendBatch(batch, tag)
switch e := err.(type) {
case nil: // Do nothing
case PartialOutputError:
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
for _, line := range e.Logs {
b.log.ErrorD("failed-log", kv.M{"log": line})
}
case CatastrophicOutputError:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1)
default:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1)
}
}
func (b *BatchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
for _, batch := range b.batchers {
batch.Flush()
}
} else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
}
return nil
}

View file

@ -12,6 +12,6 @@ compile:
- make build
test:
override:
- echo TODO
- make test
general:
build_dir: ../.go_workspace/src/github.com/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME

64
cmd/batchconsumer/main.go Normal file
View file

@ -0,0 +1,64 @@
package main
import (
"fmt"
"log"
"os"
"time"
"gopkg.in/Clever/kayvee-go.v6/logger"
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
)
func createDummyOutput() (logger.KayveeLogger, *os.File) {
file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error())
}
kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file)
return kvlog, file
}
func main() {
config := kbc.Config{
FlushInterval: 10 * time.Second,
FlushCount: 500,
FlushSize: 4 * 1024 * 1024, // 4Mb
LogFile: "/tmp/example-kcl-consumer",
DeployEnv: "test-env",
}
output, file := createDummyOutput()
defer file.Close()
wrt := &ExampleWriter{output: output}
consumer := kbc.NewBatchConsumer(config, wrt)
consumer.Start()
}
type ExampleWriter struct {
output logger.KayveeLogger
}
func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) {
if len(rawlog)%5 == 2 {
return nil, nil, kbc.ErrLogIgnored
}
tag1 := fmt.Sprintf("tag-%d", len(rawlog)%5)
line := tag1 + ": " + string(rawlog)
return []byte(line), []string{tag1}, nil
}
func (e *ExampleWriter) SendBatch(batch [][]byte, tag string) error {
for idx, line := range batch {
e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)})
}
return nil
}

477
decode/decode.go Normal file
View file

@ -0,0 +1,477 @@
package decode
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"time"
"github.com/Clever/syslogparser/rfc3164"
)
// reservedFields are automatically set during decoding.
// no field written by a user (e.g. contained in the Kayvee JSON) should overwrite them.
var reservedFields = []string{
"prefix",
"postfix",
"Type",
}
func stringInSlice(s string, slice []string) bool {
for _, item := range slice {
if s == item {
return true
}
}
return false
}
// remapSyslog3164Keys renames fields to match our expecations from heka's syslog parser
// see: https://github.com/mozilla-services/heka/blob/278dd3d5961b9b6e47bb7a912b63ce3faaf8d8bd/sandbox/lua/decoders/rsyslog.lua
var remapSyslog3164Keys = map[string]string{
"hostname": "hostname",
"timestamp": "timestamp",
"tag": "programname",
"content": "rawlog",
}
// FieldsFromSyslog takes an RSyslog formatted log line and extracts fields from it
//
// Supports two log lines formats:
// - RSYSLOG_TraditionalFileFormat - the "old style" default log file format with low-precision timestamps (RFC3164)
// - RSYSLOG_FileFormat - a modern-style logfile format similar to TraditionalFileFormat, but with high-precision timestamps and timezone information
//
// For more details on Rsylog formats: https://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html
func FieldsFromSyslog(line string) (map[string]interface{}, error) {
// rfc3164 includes a severity number in front of the Syslog line, but we don't use that
fakeSeverity := "<12>"
p3164 := rfc3164.NewParser([]byte(fakeSeverity + line))
err := p3164.Parse()
if err != nil {
return map[string]interface{}{}, err
}
out := map[string]interface{}{}
for k, v := range p3164.Dump() {
if newKey, ok := remapSyslog3164Keys[k]; ok {
out[newKey] = v
}
}
return out, nil
}
// NonKayveeError occurs when the log line is not Kayvee
type NonKayveeError struct{}
func (e NonKayveeError) Error() string {
return fmt.Sprint("Log line is not Kayvee (doesn't have JSON payload)")
}
// FieldsFromKayvee takes a log line and extracts fields from the Kayvee (JSON) part
func FieldsFromKayvee(line string) (map[string]interface{}, error) {
m := map[string]interface{}{}
firstIdx := strings.Index(line, "{")
lastIdx := strings.LastIndex(line, "}")
if firstIdx == -1 || lastIdx == -1 || firstIdx > lastIdx {
return map[string]interface{}{}, &NonKayveeError{}
}
m["prefix"] = line[:firstIdx]
m["postfix"] = line[lastIdx+1:]
possibleJSON := line[firstIdx : lastIdx+1]
var fields map[string]interface{}
if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil {
return map[string]interface{}{}, err
}
for k, v := range fields {
if !stringInSlice(k, reservedFields) {
m[k] = v
}
}
m["type"] = "Kayvee"
return m, nil
}
// MetricsRoute represents a metrics kv log route
type MetricsRoute struct {
Series string
Dimensions []string
ValueField string
RuleName string
}
// AnalyticsRoute represents an analytics kv log route
type AnalyticsRoute struct {
Series string
RuleName string
}
// NotificationRoute represents a notification kv log route
type NotificationRoute struct {
Channel string
Icon string
Message string
User string
RuleName string
}
// AlertRoute represents an alert kv log route
type AlertRoute struct {
Series string
Dimensions []string
StatType string
ValueField string
RuleName string
}
func getStringValue(json map[string]interface{}, key string) string {
val, ok := json[key]
if !ok {
return ""
}
str, ok := val.(string)
if !ok {
return ""
}
return str
}
func getStringArray(json map[string]interface{}, key string) []string {
val, ok := json[key]
if !ok {
return []string{}
}
strArray, ok := val.([]string)
if !ok {
return []string{}
}
return strArray
}
// LogRoutes a type alias to make it easier to add route specific filter functions
type LogRoutes []map[string]interface{}
// MetricsRoutes filters the LogRoutes and returns a list of MetricsRoutes structs
func (l LogRoutes) MetricsRoutes() []MetricsRoute {
routes := []MetricsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "metrics" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, MetricsRoute{
Series: series,
Dimensions: dimensions,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// AnalyticsRoutes filters the LogRoutes and returns a list of AnalyticsRoutes structs
func (l LogRoutes) AnalyticsRoutes() []AnalyticsRoute {
routes := []AnalyticsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "analytics" {
continue
}
series := getStringValue(route, "series")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
routes = append(routes, AnalyticsRoute{
Series: series,
RuleName: ruleName,
})
}
return routes
}
// NotificationRoutes filters the LogRoutes and returns a list of NotificationRoutes structs
func (l LogRoutes) NotificationRoutes() []NotificationRoute {
routes := []NotificationRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "notifications" {
continue
}
channel := getStringValue(route, "channel")
icon := getStringValue(route, "icon")
message := getStringValue(route, "message")
user := getStringValue(route, "user")
rule := getStringValue(route, "rule")
if channel == "" || message == "" { // TODO: log error
continue
}
if icon == "" {
icon = ":ghost:"
}
if user == "" {
user = "logging-pipeline"
}
routes = append(routes, NotificationRoute{
Channel: channel,
Icon: icon,
Message: message,
User: user,
RuleName: rule,
})
}
return routes
}
// AlertRoutes filters the LogRoutes and returns a list of AlertRoutes structs
func (l LogRoutes) AlertRoutes() []AlertRoute {
routes := []AlertRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "alerts" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
statType := getStringValue(route, "stat_type")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if statType == "" {
statType = "counter"
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, AlertRoute{
Series: series,
Dimensions: dimensions,
StatType: statType,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// KVMeta a struct that represents kv-meta data
type KVMeta struct {
Team string
Version string
Language string
Routes LogRoutes
}
// ExtractKVMeta returns a struct with available kv-meta data
func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
tmp, ok := kvlog["_kvmeta"]
if !ok {
return KVMeta{}
}
kvmeta, ok := tmp.(map[string]interface{})
if !ok {
return KVMeta{}
}
kvRoutes := []map[string]interface{}{}
tmp, ok = kvmeta["routes"]
if ok {
routes, ok := tmp.([]map[string]interface{})
if ok {
kvRoutes = routes
}
}
return KVMeta{
Team: getStringValue(kvmeta, "team"),
Version: getStringValue(kvmeta, "kv_version"),
Language: getStringValue(kvmeta, "kv_language"),
Routes: kvRoutes,
}
}
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
func ParseAndEnhance(line string, env string, stringifyNested bool, renameESReservedFields bool, minimumTimestamp time.Time) (map[string]interface{}, error) {
out := map[string]interface{}{}
syslogFields, err := FieldsFromSyslog(line)
if err != nil {
return map[string]interface{}{}, err
}
for k, v := range syslogFields {
out[k] = v
}
rawlog := syslogFields["rawlog"].(string)
programname := syslogFields["programname"].(string)
// Try pulling Kayvee fields out of message
kvFields, err := FieldsFromKayvee(rawlog)
if err != nil {
if _, ok := err.(*NonKayveeError); !ok {
return map[string]interface{}{}, err
}
} else {
for k, v := range kvFields {
out[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic
out["env"] = env
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
// A separate container monitors for start/stop events, but we set the container values in such a way that
// the logs for these events will appear in context for the app that the user is looking at instead of the
// docker-events app.
forceEnv := ""
forceApp := ""
forceTask := ""
if cEnv, ok := out["container_env"]; ok {
forceEnv = cEnv.(string)
}
if cApp, ok := out["container_app"]; ok {
forceApp = cApp.(string)
}
if cTask, ok := out["container_task"]; ok {
forceTask = cTask.(string)
}
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
if err == nil {
for k, v := range meta {
out[k] = v
}
}
// ES dynamic mappings get finnicky once you start sending nested objects.
// E.g., if one app sends a field for the first time as an object, then any log
// sent by another app containing that field /not/ as an object will fail.
// One solution is to decode nested objects as strings.
if stringifyNested {
for k, v := range out {
_, ismap := v.(map[string]interface{})
_, isarr := v.([]interface{})
if ismap || isarr {
bs, _ := json.Marshal(v)
out[k] = string(bs)
}
}
}
// ES doesn't like fields that start with underscores.
if renameESReservedFields {
for oldKey, renamedKey := range esFieldRenames {
if val, ok := out[oldKey]; ok {
out[renamedKey] = val
delete(out, oldKey)
}
}
}
msgTime, ok := out["timestamp"].(time.Time)
if ok && !msgTime.After(minimumTimestamp) {
return map[string]interface{}{}, fmt.Errorf("message's timestamp < minimumTimestamp")
}
return out, nil
}
var esFieldRenames = map[string]string{
"_index": "kv__index",
"_uid": "kv__uid",
"_type": "kv__type",
"_id": "kv__id",
"_source": "kv__source",
"_size": "kv__size",
"_all": "kv__all",
"_field_names": "kv__field_names",
"_timestamp": "kv__timestamp",
"_ttl": "kv__ttl",
"_parent": "kv__parent",
"_routing": "kv__routing",
"_meta": "kv__meta",
}
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
var containerMetaRegex = regexp.MustCompile(containerMeta)
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
if programname == "" {
return map[string]string{}, fmt.Errorf("no programname")
}
env := ""
app := ""
task := ""
matches := containerMetaRegex.FindAllStringSubmatch(programname, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][4]
}
if forceEnv != "" {
env = forceEnv
}
if forceApp != "" {
app = forceApp
}
if forceTask != "" {
task = forceTask
}
if env == "" || app == "" || task == "" {
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
}
return map[string]string{
"container_env": env,
"container_app": app,
"container_task": task,
}, nil
}

834
decode/decode_test.go Normal file
View file

@ -0,0 +1,834 @@
package decode
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/Clever/syslogparser"
"github.com/stretchr/testify/assert"
)
const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
type Spec struct {
Title string
Input string
ExpectedOutput map[string]interface{}
ExpectedError error
}
func TestKayveeDecoding(t *testing.T) {
specs := []Spec{
Spec{
Title: "handles just JSON",
Input: `{"a":"b"}`,
ExpectedOutput: map[string]interface{}{
"prefix": "",
"postfix": "",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles prefix + JSON",
Input: `prefix {"a":"b"}`,
ExpectedOutput: map[string]interface{}{
"prefix": "prefix ",
"postfix": "",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles JSON + postfix",
Input: `{"a":"b"} postfix`,
ExpectedOutput: map[string]interface{}{
"prefix": "",
"postfix": " postfix",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles prefix + JSON + postfix",
Input: `prefix {"a":"b"} postfix`,
ExpectedOutput: map[string]interface{}{
"prefix": "prefix ",
"postfix": " postfix",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "Returns NonKayveeError if not JSON in body",
Input: `prefix { postfix`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &NonKayveeError{},
},
Spec{
Title: "errors on invalid JSON (missing a quote)",
Input: `prefix {"a:"b"} postfix`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &json.SyntaxError{},
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := FieldsFromKayvee(spec.Input)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
func TestSyslogDecoding(t *testing.T) {
// timestamps in Rsyslog_TraditionalFileFormat
logTime, err := time.Parse(time.Stamp, "Oct 25 10:20:37")
if err != nil {
t.Fatal(err)
}
// parsing assumes log is from the current year
logTime = logTime.AddDate(time.Now().Year(), 0, 0).UTC()
logTime2, err := time.Parse(time.Stamp, "Apr 5 21:45:54")
if err != nil {
t.Fatal(err)
}
logTime2 = logTime2.AddDate(time.Now().Year(), 0, 0).UTC()
// timestamp in Rsyslog_FileFormat
logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00")
if err != nil {
t.Fatal(err)
}
logTime3 = logTime3.UTC()
specs := []Spec{
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat with simple log body",
Input: `Oct 25 10:20:37 some-host docker/fa3a5e338a47[1294]: log body`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime,
"hostname": "some-host",
"programname": "docker/fa3a5e338a47",
"rawlog": "log body",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat with haproxy access log body",
Input: `Apr 5 21:45:54 influx-service docker/0000aa112233[1234]: [httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime2,
"hostname": "influx-service",
"programname": "docker/0000aa112233",
"rawlog": "[httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat",
Input: `Apr 5 21:45:54 mongodb-some-machine whackanop: 2017/04/05 21:46:11 found 0 ops`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime2,
"hostname": "mongodb-some-machine",
"programname": "whackanop",
"rawlog": "2017/04/05 21:46:11 found 0 ops",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_ FileFormat with Kayvee payload",
Input: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
},
ExpectedError: nil,
},
Spec{
Title: "Fails to parse non-RSyslog log line",
Input: `not rsyslog`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &syslogparser.ParserError{},
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := FieldsFromSyslog(spec.Input)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
type ParseAndEnhanceInput struct {
Line string
StringifyNested bool
RenameESReservedFields bool
MinimumTimestamp time.Time
}
type ParseAndEnhanceSpec struct {
Title string
Input ParseAndEnhanceInput
ExpectedOutput map[string]interface{}
ExpectedError error
}
func TestParseAndEnhance(t *testing.T) {
// timestamp in Rsyslog_FileFormat
logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00")
if err != nil {
t.Fatal(err)
}
logTime3 = logTime3.UTC()
specs := []ParseAndEnhanceSpec{
ParseAndEnhanceSpec{
Title: "Parses a Kayvee log line from an ECS app",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Parses a Kayvee log line from an ECS app, with override to container_app",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "force-app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Parses a non-Kayvee log line",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: some log`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `some log`,
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Fails to parse non-RSyslog log line",
Input: ParseAndEnhanceInput{Line: `not rsyslog`},
ExpectedOutput: map[string]interface{}{},
ExpectedError: &syslogparser.ParserError{},
},
ParseAndEnhanceSpec{
Title: "Parses JSON values",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": map[string]interface{}{"a": "b"},
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to stringify object values",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
StringifyNested: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": `{"a":"b"}`,
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to stringify array values",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`,
StringifyNested: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": `[{"a":"b"}]`,
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to rename reserved ES fields",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Errors if logTime < MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{},
ExpectedError: fmt.Errorf(""),
},
ParseAndEnhanceSpec{
Title: "Accepts logs if logTime > MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Accepts logs if logTime > MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := ParseAndEnhance(spec.Input.Line, "deploy-env", spec.Input.StringifyNested, spec.Input.RenameESReservedFields, spec.Input.MinimumTimestamp)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
func TestGetContainerMeta(t *testing.T) {
assert := assert.New(t)
t.Log("Must have a programname to get container meta")
programname := ""
_, err := getContainerMeta(programname, "", "", "")
assert.Error(err)
t.Log("Can parse a programname")
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err := getContainerMeta(programname, "", "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'env'")
overrideEnv := "force-env"
meta, err = getContainerMeta(programname, overrideEnv, "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'app'")
overrideApp := "force-app"
meta, err = getContainerMeta(programname, "", overrideApp, "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": overrideApp,
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'task'")
overrideTask := "force-task"
meta, err = getContainerMeta(programname, "", "", overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": overrideTask,
}, meta)
t.Log("Can override all fields")
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": overrideApp,
"container_task": overrideTask,
}, meta)
}
func TestExtractKVMeta(t *testing.T) {
assert := assert.New(t)
tests := []struct {
Description string
Log map[string]interface{}
Team string
Language string
Version string
ExpectedMetricsRoutes []MetricsRoute
ExpectedAnalyticsRoutes []AnalyticsRoute
ExpectedNotificationRoutes []NotificationRoute
ExpectedAlertRoutes []AlertRoute
}{
{
Description: "log line with no routes",
Log: map[string]interface{}{"hi": "hello!"},
},
{
Description: "empty _kvmeta",
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{},
},
},
{
Description: "_kvmeta with no routes",
Team: "green",
Version: "three",
Language: "tree",
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "three",
"kv_language": "tree",
},
},
},
{
Description: "_kvmeta with metric routes",
Team: "green",
Version: "three",
Language: "tree",
ExpectedMetricsRoutes: []MetricsRoute{
{
Series: "1,1,2,3,5,8,13",
Dimensions: []string{"app", "district"},
ValueField: "value",
RuleName: "cool",
},
{
Series: "1,1,2,6,24,120,720,5040",
Dimensions: []string{"app", "district"},
ValueField: "value",
RuleName: "cool2",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "three",
"kv_language": "tree",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "metrics",
"rule": "cool",
"series": "1,1,2,3,5,8,13",
"value_field": "value",
"dimensions": []string{"app", "district"},
},
map[string]interface{}{
"type": "metrics",
"rule": "cool2",
"series": "1,1,2,6,24,120,720,5040",
"dimensions": []string{"app", "district"},
},
},
},
},
},
{
Description: "_kvmeta with analytic routes",
Team: "green",
Version: "christmas",
Language: "tree",
ExpectedAnalyticsRoutes: []AnalyticsRoute{
{
Series: "what-is-this",
RuleName: "what's-this?",
},
{
RuleName: "there's-app-invites-everywhere",
Series: "there's-bts-in-the-air",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "christmas",
"kv_language": "tree",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "analytics",
"rule": "what's-this?",
"series": "what-is-this",
},
map[string]interface{}{
"type": "analytics",
"rule": "there's-app-invites-everywhere",
"series": "there's-bts-in-the-air",
},
},
},
},
},
{
Description: "_kvmeta with notification routes",
Team: "slack",
Version: "evergreen",
Language: "markdown-ish",
ExpectedNotificationRoutes: []NotificationRoute{
{
RuleName: "did-you-know",
Channel: "originally-slack",
Message: "was a gaming company",
Icon: ":video_game:",
User: "og slack bronie",
},
{
RuleName: "what's-the-catch",
Channel: "slack-is-built-with-php",
Message: "just like farmville",
Icon: ":ghost:",
User: "logging-pipeline",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "slack",
"kv_version": "evergreen",
"kv_language": "markdown-ish",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "notifications",
"rule": "did-you-know",
"channel": "originally-slack",
"message": "was a gaming company",
"icon": ":video_game:",
"user": "og slack bronie",
},
map[string]interface{}{
"type": "notifications",
"rule": "what's-the-catch",
"channel": "slack-is-built-with-php",
"message": "just like farmville",
},
},
},
},
},
{
Description: "_kvmeta with alert routes",
Team: "a-team",
Version: "old",
Language: "jive",
ExpectedAlertRoutes: []AlertRoute{
{
RuleName: "last-call",
Series: "doing-it-til-we-fall",
Dimensions: []string{"who", "where"},
StatType: "guage",
ValueField: "status",
},
{
RuleName: "watch-out-now",
Series: "dem-gators-gonna-bite-ya",
Dimensions: []string{"how-fresh", "how-clean"},
StatType: "counter",
ValueField: "value",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "a-team",
"kv_version": "old",
"kv_language": "jive",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "alerts",
"rule": "last-call",
"series": "doing-it-til-we-fall",
"dimensions": []string{"who", "where"},
"stat_type": "guage",
"value_field": "status",
},
map[string]interface{}{
"type": "alerts",
"rule": "watch-out-now",
"series": "dem-gators-gonna-bite-ya",
"dimensions": []string{"how-fresh", "how-clean"},
},
},
},
},
},
{
Description: "_kvmeta with all types of routes",
Team: "diversity",
Version: "kv-routes",
Language: "understanding",
ExpectedMetricsRoutes: []MetricsRoute{
{
RuleName: "all-combos",
Series: "1,1,2,6,24,120,720,5040",
Dimensions: []string{"fact", "orial"},
ValueField: "value",
},
},
ExpectedAnalyticsRoutes: []AnalyticsRoute{
{
RuleName: "there's-app-invites-everywhere",
Series: "there's-bts-in-the-air",
},
},
ExpectedNotificationRoutes: []NotificationRoute{
{
RuleName: "what's-the-catch",
Channel: "slack-is-built-with-php",
Message: "just like farmville",
Icon: ":ghost:",
User: "logging-pipeline",
},
},
ExpectedAlertRoutes: []AlertRoute{
{
RuleName: "last-call",
Series: "doing-it-til-we-fall",
Dimensions: []string{"who", "where"},
StatType: "guage",
ValueField: "status",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "diversity",
"kv_version": "kv-routes",
"kv_language": "understanding",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "metrics",
"rule": "all-combos",
"series": "1,1,2,6,24,120,720,5040",
"dimensions": []string{"fact", "orial"},
},
map[string]interface{}{
"type": "analytics",
"rule": "there's-app-invites-everywhere",
"series": "there's-bts-in-the-air",
},
map[string]interface{}{
"type": "notifications",
"rule": "what's-the-catch",
"channel": "slack-is-built-with-php",
"message": "just like farmville",
},
map[string]interface{}{
"type": "alerts",
"rule": "last-call",
"series": "doing-it-til-we-fall",
"dimensions": []string{"who", "where"},
"stat_type": "guage",
"value_field": "status",
},
},
},
},
},
}
for _, test := range tests {
t.Log(test.Description)
kvmeta := ExtractKVMeta(test.Log)
assert.Equal(test.Team, kvmeta.Team)
assert.Equal(test.Language, kvmeta.Language)
assert.Equal(test.Version, kvmeta.Version)
assert.Equal(len(test.ExpectedMetricsRoutes), len(kvmeta.Routes.MetricsRoutes()))
for idx, route := range kvmeta.Routes.MetricsRoutes() {
expected := test.ExpectedMetricsRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedAnalyticsRoutes), len(kvmeta.Routes.AnalyticsRoutes()))
for idx, route := range kvmeta.Routes.AnalyticsRoutes() {
expected := test.ExpectedAnalyticsRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedNotificationRoutes), len(kvmeta.Routes.NotificationRoutes()))
for idx, route := range kvmeta.Routes.NotificationRoutes() {
expected := test.ExpectedNotificationRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedAlertRoutes), len(kvmeta.Routes.AlertRoutes()))
for idx, route := range kvmeta.Routes.AlertRoutes() {
expected := test.ExpectedAlertRoutes[idx]
assert.Exactly(expected, route)
}
}
}
// Benchmarks
const benchmarkLine = `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`
func BenchmarkFieldsFromKayvee(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := FieldsFromKayvee(benchmarkLine)
if err != nil {
b.FailNow()
}
}
}
func BenchmarkFieldsFromSyslog(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := FieldsFromSyslog(benchmarkLine)
if err != nil {
b.FailNow()
}
}
}
func BenchmarkParseAndEnhance(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := ParseAndEnhance(benchmarkLine, "env", false, false, time.Time{})
if err != nil {
b.FailNow()
}
}
}

53
glide.lock generated Normal file
View file

@ -0,0 +1,53 @@
hash: 6da9731518797a7e339144f126af589f72f860bee154b0f2a552f91d2bc01bab
updated: 2017-07-18T02:00:59.747262097Z
imports:
- name: github.com/aws/aws-sdk-go
version: b73b028e599fa9176687c70b8f9cafbe57c27d20
subpackages:
- aws
- aws/session
- service/firehose
- name: github.com/Clever/kinesis-to-firehose
version: dca69c87e1662c7b1f55581399544d05fdcb09ab
subpackages:
- writer
- name: github.com/Clever/syslogparser
version: 93ab95f7ff16c9ef1f2d09bc37c0a0c31bad98ea
subpackages:
- rfc3164
- name: github.com/jeromer/syslogparser
version: 0e4ae46ea3f08de351074b643d649d5d00661a3c
- name: github.com/xeipuuv/gojsonpointer
version: e0fe6f68307607d540ed8eac07a342c33fa1b54a
- name: github.com/xeipuuv/gojsonreference
version: e02fc20de94c78484cd5ffb007f8af96be030a45
- name: github.com/xeipuuv/gojsonschema
version: e18f0065e8c148fcf567ac43a3f8f5b66ac0720b
- name: golang.org/x/net
version: a6577fac2d73be281a500b310739095313165611
subpackages:
- context
- name: golang.org/x/time
version: f51c12702a4d776e4c1fa9b0fabab841babae631
subpackages:
- rate
- name: gopkg.in/Clever/kayvee-go.v6
version: 096364e316a52652d3493be702d8105d8d01db84
subpackages:
- logger
- router
- name: gopkg.in/yaml.v2
version: a5b47d31c556af34a302ce5d659e6fea44d90de0
testImports:
- name: github.com/davecgh/go-spew
version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9
subpackages:
- spew
- name: github.com/pmezard/go-difflib
version: d8ed2627bdf02c080bf22230dbb337003b7aba2d
subpackages:
- difflib
- name: github.com/stretchr/testify
version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0
subpackages:
- assert

20
glide.yaml Normal file
View file

@ -0,0 +1,20 @@
package: github.com/Clever/amazon-kinesis-client-go
import:
- package: github.com/Clever/kinesis-to-firehose
subpackages:
- writer
- package: github.com/Clever/syslogparser
subpackages:
- rfc3164
- package: github.com/aws/aws-sdk-go
subpackages:
- aws
- aws/session
- service/firehose
- package: golang.org/x/time
subpackages:
- rate
testImport:
- package: github.com/stretchr/testify
subpackages:
- assert

10
splitter/README.md Normal file
View file

@ -0,0 +1,10 @@
splitter
===
This splitter allows ingesting logs from a CWLogs subscription.
Splitter's expected input is batched logs from a CloudWatchLogs subscription to a Kinesis Stream.
The CWLogs subscription has a special format which bundles several logs into a single record.
The splitter takes this record and splits it into multiple logs.
These logs are also modified to mimic the RSyslog format we expect from our other logs.
This allows them to be decoded normally by the rest of the pipeline.

113
splitter/splitter.go Normal file
View file

@ -0,0 +1,113 @@
package splitter
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
"time"
)
// LogEvent is a single log line within a LogEventBatch
type LogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
type LogEventBatch struct {
MessageType string `json:"messageType"`
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
LogEvents []LogEvent `json:"logEvents"`
}
// IsGzipped returns whether or not a string is Gzipped (determined by looking for a Gzip byte prefix)
func IsGzipped(b []byte) bool {
return b[0] == 0x1f && b[1] == 0x8b
}
// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into
// a slice of messages.
func GetMessagesFromGzippedInput(input []byte, prodEnv bool) ([][]byte, error) {
unpacked, err := unpack(input)
if err != nil {
return [][]byte{}, err
}
return Split(unpacked, prodEnv), nil
}
// Unpack expects a gzipped + json-stringified LogEventBatch
func unpack(input []byte) (LogEventBatch, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(input))
if err != nil {
return LogEventBatch{}, err
}
byt, err := ioutil.ReadAll(gzipReader)
if err != nil {
return LogEventBatch{}, err
}
var dat LogEventBatch
if err := json.Unmarshal(byt, &dat); err != nil {
return LogEventBatch{}, err
}
return dat, nil
}
// RFC3339Micro is the RFC3339 format in microseconds
const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\/` + // task-id
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // container-id
var taskRegex = regexp.MustCompile(taskMeta)
// Split takes a LogEventBatch and separates into a slice of enriched log lines
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
// the subsequent decoding logic.
func Split(b LogEventBatch, prodEnv bool) [][]byte {
env := "unknown"
app := "unknown"
task := "00001111-2222-3333-4444-555566667777"
matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][3]
}
if (env == "production") != prodEnv {
// if there's a mis-match between the consumer's environment and the log's environment,
// throw away the log. (this is a workaround for coarse grained subscription filters.)
return [][]byte{}
}
rsyslogPrefix := `%s %s %s[%d]: %s`
// programName is a mocked ARN in the format expected by our log decoders
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
mockPid := 1
hostname := "aws-batch"
out := [][]byte{}
for _, event := range b.LogEvents {
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond)
logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro)
// Fake an RSyslog prefix, expected by consumers
formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message)
out = append(out, []byte(formatted))
}
return out
}

182
splitter/splitter_test.go Normal file
View file

@ -0,0 +1,182 @@
package splitter
import (
"bytes"
"compress/gzip"
b64 "encoding/base64"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
func TestUnpacking(t *testing.T) {
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
decoded, err := b64.StdEncoding.DecodeString(input)
assert.NoError(t, err)
output, err := unpack(string(decoded))
assert.NoError(t, err)
expectedOutput := LogEventBatch{
MessageType: "CONTROL_MESSAGE",
Owner: "CloudwatchLogs",
LogGroup: "",
LogStream: "",
SubscriptionFilters: []string{},
LogEvents: []LogEvent{
{
ID: "",
Timestamp: 1498519943285,
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
},
},
}
assert.Equal(t, expectedOutput, output)
}
func pack(input LogEventBatch) (string, error) {
src, err := json.Marshal(input)
if err != nil {
return "", err
}
// Gzip
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(src); err != nil {
return "", err
}
if err := gz.Flush(); err != nil {
panic(err)
}
if err := gz.Close(); err != nil {
return "", err
}
// Base64 Encode
return b64.StdEncoding.EncodeToString([]byte(b.String())), nil
}
func TestFullLoop(t *testing.T) {
input := `{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "/aws/batch/job",
"logStream": "environment--app/11111111-2222-3333-4444-555566667777/88889999-0000-aaa-bbbb-ccccddddeeee",
"subscriptionFilters": [
"MySubscriptionFilter"
],
"logEvents": [
{
"id": "33418742379011144044923130086453437181614530551221780480",
"timestamp": 1498548236012,
"message": "some log line"
},
{
"id": "33418742387663833181953011865369295871402094815542181889",
"timestamp": 1498548236400,
"message": "2017/06/27 07:23:56 Another log line"
}
]
}`
var leb LogEventBatch
err := json.Unmarshal([]byte(input), &leb)
assert.NoError(t, err)
packed, err := pack(leb)
assert.NoError(t, err)
decoded, err := b64.StdEncoding.DecodeString(packed)
assert.NoError(t, err)
output, err := unpack(string(decoded))
assert.NoError(t, err)
assert.Equal(t, leb, output)
}
func TestSplit(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv := false
lines := Split(input, prodEnv)
expected := [][]byte{
"2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line",
"2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line",
}
assert.Equal(t, expected, lines)
}
func TestSplitFiltersByEnv(t *testing.T) {
t.Log("If Split is run with prodEnv == true, it should omit logs with env != production")
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
// LogStream: "environment--app",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv := true
lines := Split(input, prodEnv)
expected := [][]byte{}
assert.Equal(t, expected, lines)
t.Log("If Split is run with prodEnv == false, it should omit logs with env == production")
input = LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "production--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
// LogStream: "environment--app",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv = false
lines = Split(input, prodEnv)
expected = [][]byte{}
assert.Equal(t, expected, lines)
}