Merge pull request #5 from Clever/INFRA-2405-new-consumer-interface

Implemented new consumer interface as well as example consumer
This commit is contained in:
Xavi 2017-07-24 13:12:43 -07:00 committed by GitHub
commit 4c6d8453ef
21 changed files with 3368 additions and 23 deletions

View file

@ -1,6 +1,10 @@
include golang.mk
.DEFAULT_GOAL := test # override default goal set in library makefile
SHELL := /bin/bash
JAR_DIR := jars
PKG := github.com/Clever/amazon-kinesis-client-go
PKGS := $(shell go list ./... | grep -v /vendor )
.PHONY: download_jars run build
URL_PREFIX := http://search.maven.org/remotecontent?filepath=
@ -30,6 +34,8 @@ EMPTY :=
SPACE := $(EMPTY) $(EMPTY)
JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD))
CONSUMER ?= consumer
$(JARS_TO_DOWNLOAD):
mkdir -p `dirname $@`
curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'`
@ -37,8 +43,23 @@ $(JARS_TO_DOWNLOAD):
download_jars: $(JARS_TO_DOWNLOAD)
build:
CGO_ENABLED=0 go build -installsuffix cgo -o build/consumer $(PKG)/cmd/consumer
CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER)
run: build download_jars
command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; }
java -cp $(JAVA_CLASS_PATH) com.amazonaws.services.kinesis.multilang.MultiLangDaemon consumer.properties
java -cp $(JAVA_CLASS_PATH) \
com.amazonaws.services.kinesis.multilang.MultiLangDaemon \
$(CONSUMER).properties
bench:
go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/
test: $(PKGS)
$(PKGS): golang-test-all-deps
$(call golang-test-all,$@)
$(GOPATH)/bin/glide:
@go get github.com/Masterminds/glide
install_deps: $(GOPATH)/bin/glide
@$(GOPATH)/bin/glide install

83
batchconsumer.properties Normal file
View file

@ -0,0 +1,83 @@
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = build/batchconsumer
# The name of an Amazon Kinesis stream to process.
streamName = test-stream
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = KCLGoExample
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = golang
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# The KCL defaults to us-east-1
regionName = us-west-1
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000
# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0

View file

@ -0,0 +1,194 @@
package batcher
import (
"fmt"
"math/big"
"sync"
"time"
)
// SequencePair a convience way to pass around a Sequence / SubSequence pair
type SequencePair struct {
Sequence *big.Int
SubSequence int
}
func (s SequencePair) IsEmpty() bool {
return s.Sequence == nil
}
func (s SequencePair) IsLessThan(pair SequencePair) bool {
if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable
return false
}
cmp := s.Sequence.Cmp(pair.Sequence)
if cmp == -1 {
return true
}
if cmp == 1 {
return false
}
return s.SubSequence < pair.SubSequence
}
// Sync is used to allow a writer to syncronize with the batcher.
// The writer declares how to write messages (via its `SendBatch` method), while the batcher
// keeps track of messages written
type Sync interface {
SendBatch(batch [][]byte)
}
// Batcher interface
type Batcher interface {
// AddMesage to the batch
AddMessage(msg []byte, sequencePair SequencePair) error
// Flush all messages from the batch
Flush()
// SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in
// the current batch
SmallestSequencePair() SequencePair
}
type msgPack struct {
msg []byte
sequencePair SequencePair
}
type batcher struct {
mux sync.Mutex
flushInterval time.Duration
flushCount int
flushSize int
// smallestSeq are used for checkpointing
smallestSeq SequencePair
sync Sync
msgChan chan<- msgPack
flushChan chan<- struct{}
}
// New creates a new Batcher
// - sync - synchronizes batcher with writer
// - flushInterval - how often accumulated messages should be flushed (default 1 second).
// - flushCount - number of messages that trigger a flush (default 10).
// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb)
func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) {
if flushSize == 0 {
return nil, fmt.Errorf("flush size must be non-zero")
}
if flushCount == 0 {
return nil, fmt.Errorf("flush count must be non-zero")
}
if flushInterval == 0 {
return nil, fmt.Errorf("flush interval must be non-zero")
}
msgChan := make(chan msgPack)
flushChan := make(chan struct{})
b := &batcher{
flushCount: flushCount,
flushInterval: flushInterval,
flushSize: flushSize,
sync: sync,
msgChan: msgChan,
flushChan: flushChan,
}
go b.startBatcher(msgChan, flushChan)
return b, nil
}
func (b *batcher) SmallestSequencePair() SequencePair {
b.mux.Lock()
defer b.mux.Unlock()
return b.smallestSeq
}
func (b *batcher) SetFlushInterval(dur time.Duration) {
b.flushInterval = dur
}
func (b *batcher) SetFlushCount(count int) {
b.flushCount = count
}
func (b *batcher) SetFlushSize(size int) {
b.flushSize = size
}
func (b *batcher) AddMessage(msg []byte, pair SequencePair) error {
if len(msg) <= 0 {
return fmt.Errorf("Empty messages can't be sent")
}
b.msgChan <- msgPack{msg, pair}
return nil
}
// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch.
// When flush() is called, the batcher sends the sequence number to the writer. When the writer
// checkpoints, it does so up to the latest message that was flushed successfully.
func (b *batcher) updateSequenceNumbers(pair SequencePair) {
b.mux.Lock()
defer b.mux.Unlock()
if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) {
b.smallestSeq = pair
}
}
func (b *batcher) Flush() {
b.flushChan <- struct{}{}
}
func (b *batcher) batchSize(batch [][]byte) int {
total := 0
for _, msg := range batch {
total += len(msg)
}
return total
}
func (b *batcher) flush(batch [][]byte) [][]byte {
if len(batch) > 0 {
b.sync.SendBatch(batch)
b.mux.Lock()
b.smallestSeq = SequencePair{nil, 0}
b.mux.Unlock()
}
return [][]byte{}
}
func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) {
batch := [][]byte{}
for {
select {
case <-time.After(b.flushInterval):
batch = b.flush(batch)
case <-flushChan:
batch = b.flush(batch)
case pack := <-msgChan:
size := b.batchSize(batch)
if b.flushSize < size+len(pack.msg) {
batch = b.flush(batch)
}
batch = append(batch, pack.msg)
b.updateSequenceNumbers(pack.sequencePair)
if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) {
batch = b.flush(batch)
}
}
}
}

View file

@ -0,0 +1,264 @@
package batcher
import (
"fmt"
"math/big"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type batch [][]byte
type MockSync struct {
flushChan chan struct{}
batches []batch
}
func NewMockSync() *MockSync {
return &MockSync{
flushChan: make(chan struct{}, 1),
batches: []batch{},
}
}
func (m *MockSync) SendBatch(b [][]byte) {
m.batches = append(m.batches, batch(b))
m.flushChan <- struct{}{}
}
func (m *MockSync) waitForFlush(timeout time.Duration) error {
select {
case <-m.flushChan:
return nil
case <-time.After(timeout):
return fmt.Errorf("timed out before flush (waited %s)", timeout.String())
}
}
var mockSequence = SequencePair{big.NewInt(99999), 12345}
func TestBatchingByCount(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher, err := New(sync, time.Hour, 2, 1024*1024)
assert.NoError(err)
t.Log("Batcher respect count limit")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(2, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
assert.Equal("heyhey", string(sync.batches[0][1]))
t.Log("Batcher doesn't send partial batches")
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestBatchingByTime(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024)
assert.NoError(err)
t.Log("Batcher sends partial batches when time expires")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
t.Log("Batcher sends all messsages in partial batches when time expires")
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(2, len(sync.batches))
assert.Equal(2, len(sync.batches[1]))
assert.Equal("heyhey", string(sync.batches[1][0]))
assert.Equal("yoyo", string(sync.batches[1][1]))
t.Log("Batcher doesn't send empty batches")
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestBatchingBySize(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher, err := New(sync, time.Hour, 2000000, 8)
assert.NoError(err)
t.Log("Large messages are sent immediately")
assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hellohello", string(sync.batches[0][0]))
t.Log("Batcher tries not to exceed size limit")
assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence))
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(2, len(sync.batches))
assert.Equal(1, len(sync.batches[1]))
assert.Equal("heyhey", string(sync.batches[1][0]))
t.Log("Batcher sends messages that didn't fit in previous batch")
assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(3, len(sync.batches))
assert.Equal(2, len(sync.batches[2]))
assert.Equal("hihi", string(sync.batches[2][0]))
assert.Equal("yoyo", string(sync.batches[2][1]))
t.Log("Batcher doesn't send partial batches")
assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
}
func TestFlushing(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher, err := New(sync, time.Hour, 2000000, 1024*1024)
assert.NoError(err)
t.Log("Calling flush sends pending messages")
assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence))
err = sync.waitForFlush(time.Millisecond * 10)
assert.Error(err)
batcher.Flush()
err = sync.waitForFlush(time.Millisecond * 10)
assert.NoError(err)
assert.Equal(1, len(sync.batches))
assert.Equal(1, len(sync.batches[0]))
assert.Equal("hihi", string(sync.batches[0][0]))
}
func TestSendingEmpty(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
batcher, err := New(sync, time.Second, 10, 1024*1024)
assert.NoError(err)
t.Log("An error is returned when an empty message is sent")
err = batcher.AddMessage([]byte{}, mockSequence)
assert.Error(err)
assert.Equal(err.Error(), "Empty messages can't be sent")
}
func TestUpdatingSequence(t *testing.T) {
assert := assert.New(t)
sync := NewMockSync()
b, err := New(sync, time.Second, 10, 1024*1024)
assert.NoError(err)
batcher := b.(*batcher)
t.Log("Initally, smallestSeq is undefined")
assert.Nil(batcher.SmallestSequencePair().Sequence)
expected := new(big.Int)
t.Log("After AddMessage (seq=1), smallestSeq = 1")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234})
expected.SetInt64(1)
seq := batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234})
seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("After AddMessage (seq=1), smallestSeq = 0")
batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234})
expected.SetInt64(0)
seq = batcher.SmallestSequencePair()
assert.True(expected.Cmp(seq.Sequence) == 0)
t.Log("Flushing batch clears smallest sequence pair")
assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234}))
sync.waitForFlush(time.Minute)
assert.Nil(batcher.SmallestSequencePair().Sequence)
}
func TestSequencePairIsLessThan(t *testing.T) {
assert := assert.New(t)
big10 := big.NewInt(10)
big5 := big.NewInt(5)
tests := []struct {
left SequencePair
right SequencePair
isLess bool
}{
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
{left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false},
{left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false},
}
for _, test := range tests {
left := test.left
right := test.right
t.Logf(
"Is <%s, %d> less than <%s, %d>? %t",
left.Sequence.String(), left.SubSequence,
right.Sequence.String(), right.SubSequence,
test.isLess,
)
assert.Equal(test.isLess, left.IsLessThan(right))
}
}
func TestSequencePairEmpty(t *testing.T) {
assert := assert.New(t)
assert.True(SequencePair{nil, 0}.IsEmpty())
assert.True(SequencePair{nil, 10000}.IsEmpty())
assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty())
assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty())
}

120
batchconsumer/consumer.go Normal file
View file

@ -0,0 +1,120 @@
package batchconsumer
import (
"io"
"log"
"os"
"time"
"gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
type Config struct {
// DeployEnv the name of the deployment environment
DeployEnv string
// LogFile where consumer errors and failed log lines are saved
LogFile string
// BatchInterval the upper bound on how often SendBatch is called with accumulated messages
BatchInterval time.Duration
// BatchCount is the number of messages that triggers a SendBatch call
BatchCount int
// BatchSize is the size of a batch in bytes that triggers a SendBatch call
BatchSize int
// ReadRateLimit the number of records read per seconds
ReadRateLimit int
// ReadBurstLimit the max number of tokens allowed by rate limiter
ReadBurstLimit int
// CheckpointFreq the frequency in which a checkpoint is saved
CheckpointFreq time.Duration
// CheckpointRetries the number of times the consumer will try to save a checkpoint
CheckpointRetries int
// CheckpointRetrySleep the amount of time between checkpoint save attempts
CheckpointRetrySleep time.Duration
}
// BatchConsumer is responsible for marshalling
type BatchConsumer struct {
kclProcess *kcl.KCLProcess
logfile *os.File
}
func withDefaults(config Config) Config {
if config.LogFile == "" {
config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339)
}
if config.BatchInterval == 0 {
config.BatchInterval = 10 * time.Second
}
if config.BatchCount == 0 {
config.BatchCount = 500
}
if config.BatchSize == 0 {
config.BatchSize = 4 * 1024 * 1024
}
if config.DeployEnv == "" {
config.DeployEnv = "unknown-env"
}
if config.ReadRateLimit == 0 {
config.ReadRateLimit = 300
}
if config.ReadBurstLimit == 0 {
config.ReadBurstLimit = int(float64(config.ReadRateLimit)*1.2 + 0.5)
}
if config.CheckpointFreq == 0 {
config.CheckpointFreq = 60 * time.Second
}
if config.CheckpointRetries == 0 {
config.CheckpointRetries = 5
}
if config.CheckpointRetrySleep == 0 {
config.CheckpointRetrySleep = 5 * time.Second
}
return config
}
// NewBatchConsumerFromFiles creates a batch consumer. Readers/writers provided are used for
// interprocess communication.
func NewBatchConsumerFromFiles(
config Config, sender Sender, input io.Reader, output, errFile io.Writer,
) *BatchConsumer {
config = withDefaults(config)
file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error())
}
kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file)
wrt := NewBatchedWriter(config, sender, kvlog)
kclProcess := kcl.New(input, output, errFile, wrt)
return &BatchConsumer{
kclProcess: kclProcess,
logfile: file,
}
}
// NewBatchConsumer creates batch consumer. Stdin, Stdout, and Stderr are used for interprocess
// communication.
func NewBatchConsumer(config Config, sender Sender) *BatchConsumer {
return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr)
}
// Start when called, the consumer begins ingesting messages. This function blocks.
func (b *BatchConsumer) Start() {
b.kclProcess.Run()
b.logfile.Close()
}

46
batchconsumer/sender.go Normal file
View file

@ -0,0 +1,46 @@
package batchconsumer
import (
"errors"
"fmt"
)
// ErrMessageIgnored should be returned by ProcessMessage when it encounters a message that will
// not be consumed
var ErrMessageIgnored = errors.New("Message intentionally skipped by sender")
// Sender an interface needed for batch consumer implementations
type Sender interface {
// ProcessMessage receives a raw message and is expected to return an appropriately formatted
// message as well as a list of tags for that log line. A tag corresponds to a batch that
// it'll be put into. Typically tags are series names.
// If a message will not be consumed, ProcessMessage should return a ErrMessageIgnored error.
ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error)
// SendBatch receives a batch of messages. All messages were given the specified tag by
// ProcessMessage
SendBatch(batch [][]byte, tag string) error
}
// PartialSendBatchError should be returned by SendBatch implementations when some messages
// couldn't be sent to an output. It's expected that SendBatch implementations do a "best effort"
// before returning this error.
type PartialSendBatchError struct {
// ErrMessage is a description of error that occurred
ErrMessage string
// FailedMessages a list of messages that failed to be sent
FailedMessages [][]byte
}
func (c PartialSendBatchError) Error() string {
return fmt.Sprintf("%d failed logs. %s", len(c.FailedMessages), c.ErrMessage)
}
// CatastrophicSendBatchError should be returned by SendBatch implementations when the output is
// unreachable. Returning this error causes this container to exit without checkpointing.
type CatastrophicSendBatchError struct {
ErrMessage string
}
func (c CatastrophicSendBatchError) Error() string {
return fmt.Sprintf("catastrophic SendBatch error: %s", c.ErrMessage)
}

10
batchconsumer/sync.go Normal file
View file

@ -0,0 +1,10 @@
package batchconsumer
type batcherSync struct {
tag string
writer *batchedWriter
}
func (b *batcherSync) SendBatch(batch [][]byte) {
b.writer.SendBatch(batch, b.tag)
}

325
batchconsumer/writer.go Normal file
View file

@ -0,0 +1,325 @@
package batchconsumer
import (
"context"
"encoding/base64"
"fmt"
"math/big"
"os"
"time"
"golang.org/x/time/rate"
kv "gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher"
"github.com/Clever/amazon-kinesis-client-go/kcl"
"github.com/Clever/amazon-kinesis-client-go/splitter"
)
type tagMsgPair struct {
tag string
msg []byte
pair batcher.SequencePair
}
type batchedWriter struct {
config Config
sender Sender
log kv.KayveeLogger
shardID string
checkpointMsg chan batcher.SequencePair
checkpointTag chan string
lastProcessedPair chan batcher.SequencePair
batchMsg chan tagMsgPair
flushBatches chan struct{}
// Limits the number of records read from the stream
rateLimiter *rate.Limiter
lastProcessedSeq batcher.SequencePair
}
func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter {
return &batchedWriter{
config: config,
sender: sender,
log: log,
rateLimiter: rate.NewLimiter(rate.Limit(config.ReadRateLimit), config.ReadBurstLimit),
}
}
func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
b.shardID = shardID
b.checkpointMsg = make(chan batcher.SequencePair)
b.startCheckpointListener(checkpointer, b.checkpointMsg)
b.checkpointTag = make(chan string)
b.batchMsg = make(chan tagMsgPair)
b.flushBatches = make(chan struct{})
b.lastProcessedPair = make(chan batcher.SequencePair)
b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches)
return nil
}
// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise.
func (b *batchedWriter) handleCheckpointError(err error) bool {
if err == nil {
return false
}
cperr, ok := err.(kcl.CheckpointError)
if !ok {
b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID})
return true
}
switch cperr.Error() {
case "ShutdownException": // Skips checkpointing
b.log.ErrorD("shutdown-checkpoint-exception", kv.M{
"msg": err.Error(), "shard-id": b.shardID,
})
return false
case "ThrottlingException":
b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID})
case "InvalidStateException":
b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID})
default:
b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err})
}
return true
}
func (b *batchedWriter) startCheckpointListener(
checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair,
) {
go func() {
lastCheckpoint := time.Now()
for {
seq := <-checkpointMsg
// This is a write throttle to ensure we don't checkpoint faster than
// b.config.CheckpointFreq. The latest seq number is always used.
for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
select {
case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
}
}
retry := true
for n := 0; retry && n < b.config.CheckpointRetries+1; n++ {
str := seq.Sequence.String()
err := checkpointer.Checkpoint(&str, &seq.SubSequence)
if err == nil { // Successfully checkpointed!
lastCheckpoint = time.Now()
break
}
retry = b.handleCheckpointError(err)
if n == b.config.CheckpointRetries {
b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries})
retry = false
}
if retry {
time.Sleep(b.config.CheckpointRetrySleep)
}
}
}
}()
}
func (b *batchedWriter) createBatcher(tag string) batcher.Batcher {
sync := &batcherSync{
tag: tag,
writer: b,
}
batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize)
if err != nil {
b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag})
}
return batch
}
// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a
// go routine to avoid racey conditions.
func (b *batchedWriter) startMessageHandler(
batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair,
flushBatches <-chan struct{},
) {
go func() {
var lastProcessedPair batcher.SequencePair
batchers := map[string]batcher.Batcher{}
areBatchersEmpty := true
for {
select {
case tmp := <-batchMsg:
batcher, ok := batchers[tmp.tag]
if !ok {
batcher = b.createBatcher(tmp.tag)
batchers[tmp.tag] = batcher
}
err := batcher.AddMessage(tmp.msg, tmp.pair)
if err != nil {
b.log.ErrorD("add-message", kv.M{
"err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag,
})
}
areBatchersEmpty = false
case tag := <-checkpointTag:
smallest := lastProcessedPair
isAllEmpty := true
for name, batch := range batchers {
if tag == name {
continue
}
pair := batch.SmallestSequencePair()
if pair.IsEmpty() { // Occurs when batch has no items
continue
}
if pair.IsLessThan(smallest) {
smallest = pair
}
isAllEmpty = false
}
if !smallest.IsEmpty() {
b.checkpointMsg <- smallest
}
areBatchersEmpty = isAllEmpty
case pair := <-lastPair:
if areBatchersEmpty {
b.checkpointMsg <- pair
}
lastProcessedPair = pair
case <-flushBatches:
for _, batch := range batchers {
batch.Flush()
}
b.checkpointMsg <- lastProcessedPair
areBatchersEmpty = true
}
}
}()
}
func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) {
// We handle two types of records:
// - records emitted from CWLogs Subscription
// - records emiited from KPL
if !splitter.IsGzipped(record) {
// Process a single message, from KPL
return [][]byte{record}, nil
}
// Process a batch of messages from a CWLogs Subscription
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
}
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
var pair batcher.SequencePair
prevPair := b.lastProcessedSeq
for _, record := range records {
// Wait until rate limiter permits one more record to be processed
b.rateLimiter.Wait(context.Background())
seq := new(big.Int)
if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
}
pair = batcher.SequencePair{seq, record.SubSequenceNumber}
if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty
prevPair = pair
}
data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
return err
}
messages, err := b.splitMessageIfNecessary(data)
if err != nil {
return err
}
for _, rawmsg := range messages {
msg, tags, err := b.sender.ProcessMessage(rawmsg)
if err == ErrMessageIgnored {
continue // Skip message
} else if err != nil {
b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)})
continue // Don't stop processing messages because of one bad message
}
if len(tags) == 0 {
b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg))
}
for _, tag := range tags {
if tag == "" {
b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)})
return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg))
}
// Use second to last sequence number to ensure we don't checkpoint a message before
// it's been sent. When batches are sent, conceptually we first find the smallest
// sequence number amount all the batch (let's call it A). We then checkpoint at
// the A-1 sequence number.
b.batchMsg <- tagMsgPair{tag, msg, prevPair}
}
}
prevPair = pair
b.lastProcessedPair <- pair
}
b.lastProcessedSeq = pair
return nil
}
func (b *batchedWriter) SendBatch(batch [][]byte, tag string) {
err := b.sender.SendBatch(batch, tag)
switch e := err.(type) {
case nil: // Do nothing
case PartialSendBatchError:
b.log.ErrorD("send-batch", kv.M{"msg": e.Error()})
for _, line := range e.FailedMessages {
b.log.ErrorD("failed-log", kv.M{"log": line})
}
case CatastrophicSendBatchError:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1)
default:
b.log.CriticalD("send-batch", kv.M{"msg": e.Error()})
os.Exit(1)
}
b.checkpointTag <- tag
}
func (b *batchedWriter) Shutdown(reason string) error {
if reason == "TERMINATE" {
b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID})
b.flushBatches <- struct{}{}
} else {
b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason})
}
return nil
}

View file

@ -0,0 +1,372 @@
package batchconsumer
import (
"encoding/base64"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"gopkg.in/Clever/kayvee-go.v6/logger"
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type ignoringSender struct{}
func (i ignoringSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
return nil, nil, ErrMessageIgnored
}
func (i ignoringSender) SendBatch(batch [][]byte, tag string) error {
panic("SendBatch Should never be called. ProcessMessage ignores all messasges.")
}
type tagBatch struct {
tag string
batch [][]byte
}
type msgAsTagSender struct {
batches map[string][][][]byte
saveBatch chan tagBatch
shutdown chan struct{}
}
func NewMsgAsTagSender() *msgAsTagSender {
sender := &msgAsTagSender{
batches: map[string][][][]byte{},
saveBatch: make(chan tagBatch),
shutdown: make(chan struct{}),
}
sender.startBatchSaver(sender.saveBatch, sender.shutdown)
return sender
}
func (i *msgAsTagSender) startBatchSaver(saveBatch <-chan tagBatch, shutdown <-chan struct{}) {
go func() {
for {
select {
case tb := <-saveBatch:
batches, ok := i.batches[tb.tag]
if !ok {
batches = [][][]byte{}
}
i.batches[tb.tag] = append(batches, tb.batch)
case <-shutdown:
return
}
}
}()
}
func (i *msgAsTagSender) ProcessMessage(rawmsg []byte) (msg []byte, tags []string, err error) {
if "ignore" == string(rawmsg) {
return nil, nil, ErrMessageIgnored
}
return rawmsg, []string{string(rawmsg)}, nil
}
func (i *msgAsTagSender) SendBatch(batch [][]byte, tag string) error {
i.saveBatch <- tagBatch{tag, batch}
return nil
}
func (i *msgAsTagSender) Shutdown() {
i.shutdown <- struct{}{}
}
type mockCheckpointer struct {
recievedSequences []string
checkpoint chan string
done chan struct{}
timeout chan struct{}
shutdown chan struct{}
}
func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer {
mcp := &mockCheckpointer{
checkpoint: make(chan string),
done: make(chan struct{}, 1),
timeout: make(chan struct{}, 1),
shutdown: make(chan struct{}),
}
mcp.startWaiter(maxSeq, timeout)
return mcp
}
func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) {
go func() {
for {
select {
case seq := <-m.checkpoint:
m.recievedSequences = append(m.recievedSequences, seq)
if seq == maxSeq {
m.done <- struct{}{}
}
case <-time.NewTimer(timeout).C:
m.timeout <- struct{}{}
case <-m.shutdown:
return
}
}
}()
}
func (m *mockCheckpointer) wait() error {
select {
case <-m.done:
return nil
case <-m.timeout:
return fmt.Errorf("timeout waiting for checkpoints")
}
}
func (m *mockCheckpointer) Shutdown() {
m.shutdown <- struct{}{}
}
func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
m.checkpoint <- *sequenceNumber
return nil
}
func (m *mockCheckpointer) CheckpointWithRetry(
sequenceNumber *string, subSequenceNumber *int, retryCount int,
) error {
return m.Checkpoint(sequenceNumber, subSequenceNumber)
}
func encode(str string) string {
return base64.StdEncoding.EncodeToString([]byte(str))
}
func TestProcessRecordsIgnoredMessages(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 10 * time.Millisecond,
CheckpointFreq: 20 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("4", 5*time.Second)
wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("hi")},
kcl.Record{SequenceNumber: "2", Data: encode("hi")},
kcl.Record{SequenceNumber: "3", Data: encode("hi")},
kcl.Record{SequenceNumber: "4", Data: encode("hi")},
})
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
}
func TestProcessRecordsMutliBatchBasic(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("8", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag2")},
kcl.Record{SequenceNumber: "3", Data: encode("tag3")},
kcl.Record{SequenceNumber: "4", Data: encode("tag2")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "5", Data: encode("tag3")},
kcl.Record{SequenceNumber: "6", Data: encode("tag2")},
kcl.Record{SequenceNumber: "7", Data: encode("tag3")},
kcl.Record{SequenceNumber: "8", Data: encode("tag1")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
assert.Contains(mocksender.batches, "tag1")
assert.Equal(1, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Contains(mocksender.batches, "tag2")
assert.Equal(1, len(mocksender.batches["tag2"])) // One batch
assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items
assert.Equal("tag2", string(mocksender.batches["tag2"][0][0]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][1]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][2]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(1, len(mocksender.batches["tag3"])) // One batch
assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
}
func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Millisecond,
})
mockcheckpointer := NewMockCheckpointer("26", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("ignore")},
kcl.Record{SequenceNumber: "2", Data: encode("ignore")},
kcl.Record{SequenceNumber: "3", Data: encode("ignore")},
kcl.Record{SequenceNumber: "4", Data: encode("ignore")},
kcl.Record{SequenceNumber: "5", Data: encode("tag1")},
kcl.Record{SequenceNumber: "6", Data: encode("ignore")},
kcl.Record{SequenceNumber: "7", Data: encode("tag2")},
kcl.Record{SequenceNumber: "8", Data: encode("tag3")},
kcl.Record{SequenceNumber: "9", Data: encode("ignore")},
kcl.Record{SequenceNumber: "10", Data: encode("tag2")},
kcl.Record{SequenceNumber: "11", Data: encode("ignore")},
kcl.Record{SequenceNumber: "12", Data: encode("ignore")},
kcl.Record{SequenceNumber: "13", Data: encode("ignore")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "14", Data: encode("ignore")},
kcl.Record{SequenceNumber: "15", Data: encode("ignore")},
kcl.Record{SequenceNumber: "16", Data: encode("ignore")},
kcl.Record{SequenceNumber: "17", Data: encode("ignore")},
kcl.Record{SequenceNumber: "18", Data: encode("tag3")},
kcl.Record{SequenceNumber: "19", Data: encode("tag2")},
kcl.Record{SequenceNumber: "20", Data: encode("ignore")},
kcl.Record{SequenceNumber: "21", Data: encode("tag3")},
kcl.Record{SequenceNumber: "22", Data: encode("ignore")},
kcl.Record{SequenceNumber: "23", Data: encode("ignore")},
kcl.Record{SequenceNumber: "24", Data: encode("ignore")},
kcl.Record{SequenceNumber: "25", Data: encode("tag1")},
kcl.Record{SequenceNumber: "26", Data: encode("ignore")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
assert.Contains(mocksender.batches, "tag1")
assert.Equal(1, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Contains(mocksender.batches, "tag2")
assert.Equal(1, len(mocksender.batches["tag2"])) // One batch
assert.Equal(3, len(mocksender.batches["tag2"][0])) // with three items
assert.Equal("tag2", string(mocksender.batches["tag2"][0][0]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][1]))
assert.Equal("tag2", string(mocksender.batches["tag2"][0][2]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(1, len(mocksender.batches["tag3"])) // One batch
assert.Equal(3, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][2]))
}
func TestStaggeredCheckpionting(t *testing.T) {
assert := assert.New(t)
mocklog := logger.New("testing")
mockconfig := withDefaults(Config{
BatchCount: 2,
BatchInterval: 100 * time.Millisecond,
CheckpointFreq: 200 * time.Nanosecond,
})
mockcheckpointer := NewMockCheckpointer("9", 5*time.Second)
mocksender := NewMsgAsTagSender()
wrt := NewBatchedWriter(mockconfig, mocksender, mocklog)
wrt.Initialize("test-shard", mockcheckpointer)
err := wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "1", Data: encode("tag1")},
kcl.Record{SequenceNumber: "2", Data: encode("tag3")},
kcl.Record{SequenceNumber: "3", Data: encode("tag1")},
kcl.Record{SequenceNumber: "4", Data: encode("tag3")},
})
assert.NoError(err)
err = wrt.ProcessRecords([]kcl.Record{
kcl.Record{SequenceNumber: "5", Data: encode("tag1")},
kcl.Record{SequenceNumber: "6", Data: encode("tag3")},
kcl.Record{SequenceNumber: "7", Data: encode("tag3")},
kcl.Record{SequenceNumber: "8", Data: encode("tag3")},
kcl.Record{SequenceNumber: "9", Data: encode("tag3")},
})
assert.NoError(err)
time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once
err = wrt.Shutdown("TERMINATE")
assert.NoError(err)
err = mockcheckpointer.wait()
assert.NoError(err)
mocksender.Shutdown()
mockcheckpointer.Shutdown()
// Test to make sure writer doesn't prematurely checkpoint messages
// Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch
// Checkpoint 9 is submitted on shutdown when everything is being flushed
assert.NotContains(mockcheckpointer.recievedSequences, "5")
assert.NotContains(mockcheckpointer.recievedSequences, "6")
assert.NotContains(mockcheckpointer.recievedSequences, "7")
assert.NotContains(mockcheckpointer.recievedSequences, "8")
assert.Contains(mocksender.batches, "tag1")
assert.Equal(2, len(mocksender.batches["tag1"])) // One batch
assert.Equal(2, len(mocksender.batches["tag1"][0])) // with two items
assert.Equal("tag1", string(mocksender.batches["tag1"][0][0]))
assert.Equal("tag1", string(mocksender.batches["tag1"][0][1]))
assert.Equal("tag1", string(mocksender.batches["tag1"][1][0]))
assert.Contains(mocksender.batches, "tag3")
assert.Equal(3, len(mocksender.batches["tag3"])) // One batch
assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items
assert.Equal("tag3", string(mocksender.batches["tag3"][0][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][0][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][1][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][1][1]))
assert.Equal("tag3", string(mocksender.batches["tag3"][2][0]))
assert.Equal("tag3", string(mocksender.batches["tag3"][2][1]))
}

View file

@ -9,9 +9,10 @@ checkout:
- $HOME/ci-scripts/circleci/golang-move-project
compile:
override:
- make install_deps
- make build
test:
override:
- echo TODO
- make test
general:
build_dir: ../.go_workspace/src/github.com/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME

64
cmd/batchconsumer/main.go Normal file
View file

@ -0,0 +1,64 @@
package main
import (
"fmt"
"log"
"os"
"time"
"gopkg.in/Clever/kayvee-go.v6/logger"
kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer"
)
func createDummyOutput() (logger.KayveeLogger, *os.File) {
file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Unable to create log file: %s", err.Error())
}
kvlog := logger.New("amazon-kinesis-client-go")
kvlog.SetOutput(file)
return kvlog, file
}
func main() {
config := kbc.Config{
BatchInterval: 10 * time.Second,
BatchCount: 500,
BatchSize: 4 * 1024 * 1024, // 4Mb
LogFile: "/tmp/example-kcl-consumer",
DeployEnv: "test-env",
}
output, file := createDummyOutput()
defer file.Close()
sender := &exampleSender{output: output}
consumer := kbc.NewBatchConsumer(config, sender)
consumer.Start()
}
type exampleSender struct {
output logger.KayveeLogger
}
func (e *exampleSender) ProcessMessage(rawmsg []byte) ([]byte, []string, error) {
if len(rawmsg)%5 == 2 {
return nil, nil, kbc.ErrMessageIgnored
}
tag1 := fmt.Sprintf("tag-%d", len(rawmsg)%5)
line := tag1 + ": " + string(rawmsg)
return []byte(line), []string{tag1}, nil
}
func (e *exampleSender) SendBatch(batch [][]byte, tag string) error {
for idx, line := range batch {
e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)})
}
return nil
}

View file

@ -9,8 +9,8 @@ import (
"github.com/Clever/amazon-kinesis-client-go/kcl"
)
type SampleRecordProcessor struct {
checkpointer *kcl.Checkpointer
type sampleRecordProcessor struct {
checkpointer kcl.Checkpointer
checkpointRetries int
checkpointFreq time.Duration
largestSeq *big.Int
@ -18,25 +18,25 @@ type SampleRecordProcessor struct {
lastCheckpoint time.Time
}
func New() *SampleRecordProcessor {
return &SampleRecordProcessor{
func newSampleRecordProcessor() *sampleRecordProcessor {
return &sampleRecordProcessor{
checkpointRetries: 5,
checkpointFreq: 60 * time.Second,
}
}
func (srp *SampleRecordProcessor) Initialize(shardID string, checkpointer *kcl.Checkpointer) error {
func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Checkpointer) error {
srp.lastCheckpoint = time.Now()
srp.checkpointer = checkpointer
return nil
}
func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool {
return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 ||
(sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq)
}
func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
for _, record := range records {
seqNumber := new(big.Int)
if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok {
@ -56,7 +56,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
return nil
}
func (srp *SampleRecordProcessor) Shutdown(reason string) error {
func (srp *sampleRecordProcessor) Shutdown(reason string) error {
if reason == "TERMINATE" {
fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n")
srp.checkpointer.Shutdown()
@ -72,6 +72,6 @@ func main() {
panic(err)
}
defer f.Close()
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, New())
kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, newSampleRecordProcessor())
kclProcess.Run()
}

477
decode/decode.go Normal file
View file

@ -0,0 +1,477 @@
package decode
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"time"
"github.com/Clever/syslogparser/rfc3164"
)
// reservedFields are automatically set during decoding.
// no field written by a user (e.g. contained in the Kayvee JSON) should overwrite them.
var reservedFields = []string{
"prefix",
"postfix",
"Type",
}
func stringInSlice(s string, slice []string) bool {
for _, item := range slice {
if s == item {
return true
}
}
return false
}
// remapSyslog3164Keys renames fields to match our expecations from heka's syslog parser
// see: https://github.com/mozilla-services/heka/blob/278dd3d5961b9b6e47bb7a912b63ce3faaf8d8bd/sandbox/lua/decoders/rsyslog.lua
var remapSyslog3164Keys = map[string]string{
"hostname": "hostname",
"timestamp": "timestamp",
"tag": "programname",
"content": "rawlog",
}
// FieldsFromSyslog takes an RSyslog formatted log line and extracts fields from it
//
// Supports two log lines formats:
// - RSYSLOG_TraditionalFileFormat - the "old style" default log file format with low-precision timestamps (RFC3164)
// - RSYSLOG_FileFormat - a modern-style logfile format similar to TraditionalFileFormat, but with high-precision timestamps and timezone information
//
// For more details on Rsylog formats: https://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html
func FieldsFromSyslog(line string) (map[string]interface{}, error) {
// rfc3164 includes a severity number in front of the Syslog line, but we don't use that
fakeSeverity := "<12>"
p3164 := rfc3164.NewParser([]byte(fakeSeverity + line))
err := p3164.Parse()
if err != nil {
return map[string]interface{}{}, err
}
out := map[string]interface{}{}
for k, v := range p3164.Dump() {
if newKey, ok := remapSyslog3164Keys[k]; ok {
out[newKey] = v
}
}
return out, nil
}
// NonKayveeError occurs when the log line is not Kayvee
type NonKayveeError struct{}
func (e NonKayveeError) Error() string {
return fmt.Sprint("Log line is not Kayvee (doesn't have JSON payload)")
}
// FieldsFromKayvee takes a log line and extracts fields from the Kayvee (JSON) part
func FieldsFromKayvee(line string) (map[string]interface{}, error) {
m := map[string]interface{}{}
firstIdx := strings.Index(line, "{")
lastIdx := strings.LastIndex(line, "}")
if firstIdx == -1 || lastIdx == -1 || firstIdx > lastIdx {
return map[string]interface{}{}, &NonKayveeError{}
}
m["prefix"] = line[:firstIdx]
m["postfix"] = line[lastIdx+1:]
possibleJSON := line[firstIdx : lastIdx+1]
var fields map[string]interface{}
if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil {
return map[string]interface{}{}, err
}
for k, v := range fields {
if !stringInSlice(k, reservedFields) {
m[k] = v
}
}
m["type"] = "Kayvee"
return m, nil
}
// MetricsRoute represents a metrics kv log route
type MetricsRoute struct {
Series string
Dimensions []string
ValueField string
RuleName string
}
// AnalyticsRoute represents an analytics kv log route
type AnalyticsRoute struct {
Series string
RuleName string
}
// NotificationRoute represents a notification kv log route
type NotificationRoute struct {
Channel string
Icon string
Message string
User string
RuleName string
}
// AlertRoute represents an alert kv log route
type AlertRoute struct {
Series string
Dimensions []string
StatType string
ValueField string
RuleName string
}
func getStringValue(json map[string]interface{}, key string) string {
val, ok := json[key]
if !ok {
return ""
}
str, ok := val.(string)
if !ok {
return ""
}
return str
}
func getStringArray(json map[string]interface{}, key string) []string {
val, ok := json[key]
if !ok {
return []string{}
}
strArray, ok := val.([]string)
if !ok {
return []string{}
}
return strArray
}
// LogRoutes a type alias to make it easier to add route specific filter functions
type LogRoutes []map[string]interface{}
// MetricsRoutes filters the LogRoutes and returns a list of MetricsRoutes structs
func (l LogRoutes) MetricsRoutes() []MetricsRoute {
routes := []MetricsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "metrics" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, MetricsRoute{
Series: series,
Dimensions: dimensions,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// AnalyticsRoutes filters the LogRoutes and returns a list of AnalyticsRoutes structs
func (l LogRoutes) AnalyticsRoutes() []AnalyticsRoute {
routes := []AnalyticsRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "analytics" {
continue
}
series := getStringValue(route, "series")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
routes = append(routes, AnalyticsRoute{
Series: series,
RuleName: ruleName,
})
}
return routes
}
// NotificationRoutes filters the LogRoutes and returns a list of NotificationRoutes structs
func (l LogRoutes) NotificationRoutes() []NotificationRoute {
routes := []NotificationRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "notifications" {
continue
}
channel := getStringValue(route, "channel")
icon := getStringValue(route, "icon")
message := getStringValue(route, "message")
user := getStringValue(route, "user")
rule := getStringValue(route, "rule")
if channel == "" || message == "" { // TODO: log error
continue
}
if icon == "" {
icon = ":ghost:"
}
if user == "" {
user = "logging-pipeline"
}
routes = append(routes, NotificationRoute{
Channel: channel,
Icon: icon,
Message: message,
User: user,
RuleName: rule,
})
}
return routes
}
// AlertRoutes filters the LogRoutes and returns a list of AlertRoutes structs
func (l LogRoutes) AlertRoutes() []AlertRoute {
routes := []AlertRoute{}
for _, route := range l {
tipe := getStringValue(route, "type")
if tipe != "alerts" {
continue
}
series := getStringValue(route, "series")
dimensions := getStringArray(route, "dimensions")
statType := getStringValue(route, "stat_type")
valueField := getStringValue(route, "value_field")
ruleName := getStringValue(route, "rule")
if series == "" { // TODO: log error
continue
}
if statType == "" {
statType = "counter"
}
if valueField == "" {
valueField = "value"
}
routes = append(routes, AlertRoute{
Series: series,
Dimensions: dimensions,
StatType: statType,
ValueField: valueField,
RuleName: ruleName,
})
}
return routes
}
// KVMeta a struct that represents kv-meta data
type KVMeta struct {
Team string
Version string
Language string
Routes LogRoutes
}
// ExtractKVMeta returns a struct with available kv-meta data
func ExtractKVMeta(kvlog map[string]interface{}) KVMeta {
tmp, ok := kvlog["_kvmeta"]
if !ok {
return KVMeta{}
}
kvmeta, ok := tmp.(map[string]interface{})
if !ok {
return KVMeta{}
}
kvRoutes := []map[string]interface{}{}
tmp, ok = kvmeta["routes"]
if ok {
routes, ok := tmp.([]map[string]interface{})
if ok {
kvRoutes = routes
}
}
return KVMeta{
Team: getStringValue(kvmeta, "team"),
Version: getStringValue(kvmeta, "kv_version"),
Language: getStringValue(kvmeta, "kv_language"),
Routes: kvRoutes,
}
}
// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields
func ParseAndEnhance(line string, env string, stringifyNested bool, renameESReservedFields bool, minimumTimestamp time.Time) (map[string]interface{}, error) {
out := map[string]interface{}{}
syslogFields, err := FieldsFromSyslog(line)
if err != nil {
return map[string]interface{}{}, err
}
for k, v := range syslogFields {
out[k] = v
}
rawlog := syslogFields["rawlog"].(string)
programname := syslogFields["programname"].(string)
// Try pulling Kayvee fields out of message
kvFields, err := FieldsFromKayvee(rawlog)
if err != nil {
if _, ok := err.(*NonKayveeError); !ok {
return map[string]interface{}{}, err
}
} else {
for k, v := range kvFields {
out[k] = v
}
}
// Inject additional fields that are useful in log-searching and other business logic
out["env"] = env
// Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events.
// A separate container monitors for start/stop events, but we set the container values in such a way that
// the logs for these events will appear in context for the app that the user is looking at instead of the
// docker-events app.
forceEnv := ""
forceApp := ""
forceTask := ""
if cEnv, ok := out["container_env"]; ok {
forceEnv = cEnv.(string)
}
if cApp, ok := out["container_app"]; ok {
forceApp = cApp.(string)
}
if cTask, ok := out["container_task"]; ok {
forceTask = cTask.(string)
}
meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask)
if err == nil {
for k, v := range meta {
out[k] = v
}
}
// ES dynamic mappings get finnicky once you start sending nested objects.
// E.g., if one app sends a field for the first time as an object, then any log
// sent by another app containing that field /not/ as an object will fail.
// One solution is to decode nested objects as strings.
if stringifyNested {
for k, v := range out {
_, ismap := v.(map[string]interface{})
_, isarr := v.([]interface{})
if ismap || isarr {
bs, _ := json.Marshal(v)
out[k] = string(bs)
}
}
}
// ES doesn't like fields that start with underscores.
if renameESReservedFields {
for oldKey, renamedKey := range esFieldRenames {
if val, ok := out[oldKey]; ok {
out[renamedKey] = val
delete(out, oldKey)
}
}
}
msgTime, ok := out["timestamp"].(time.Time)
if ok && !msgTime.After(minimumTimestamp) {
return map[string]interface{}{}, fmt.Errorf("message's timestamp < minimumTimestamp")
}
return out, nil
}
var esFieldRenames = map[string]string{
"_index": "kv__index",
"_uid": "kv__uid",
"_type": "kv__type",
"_id": "kv__id",
"_source": "kv__source",
"_size": "kv__size",
"_all": "kv__all",
"_field_names": "kv__field_names",
"_timestamp": "kv__timestamp",
"_ttl": "kv__ttl",
"_parent": "kv__parent",
"_routing": "kv__routing",
"_meta": "kv__meta",
}
const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
var containerMetaRegex = regexp.MustCompile(containerMeta)
func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) {
if programname == "" {
return map[string]string{}, fmt.Errorf("no programname")
}
env := ""
app := ""
task := ""
matches := containerMetaRegex.FindAllStringSubmatch(programname, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][4]
}
if forceEnv != "" {
env = forceEnv
}
if forceApp != "" {
app = forceApp
}
if forceTask != "" {
task = forceTask
}
if env == "" || app == "" || task == "" {
return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task")
}
return map[string]string{
"container_env": env,
"container_app": app,
"container_task": task,
}, nil
}

834
decode/decode_test.go Normal file
View file

@ -0,0 +1,834 @@
package decode
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/Clever/syslogparser"
"github.com/stretchr/testify/assert"
)
const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
type Spec struct {
Title string
Input string
ExpectedOutput map[string]interface{}
ExpectedError error
}
func TestKayveeDecoding(t *testing.T) {
specs := []Spec{
Spec{
Title: "handles just JSON",
Input: `{"a":"b"}`,
ExpectedOutput: map[string]interface{}{
"prefix": "",
"postfix": "",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles prefix + JSON",
Input: `prefix {"a":"b"}`,
ExpectedOutput: map[string]interface{}{
"prefix": "prefix ",
"postfix": "",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles JSON + postfix",
Input: `{"a":"b"} postfix`,
ExpectedOutput: map[string]interface{}{
"prefix": "",
"postfix": " postfix",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "handles prefix + JSON + postfix",
Input: `prefix {"a":"b"} postfix`,
ExpectedOutput: map[string]interface{}{
"prefix": "prefix ",
"postfix": " postfix",
"a": "b",
"type": "Kayvee",
},
ExpectedError: nil,
},
Spec{
Title: "Returns NonKayveeError if not JSON in body",
Input: `prefix { postfix`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &NonKayveeError{},
},
Spec{
Title: "errors on invalid JSON (missing a quote)",
Input: `prefix {"a:"b"} postfix`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &json.SyntaxError{},
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := FieldsFromKayvee(spec.Input)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
func TestSyslogDecoding(t *testing.T) {
// timestamps in Rsyslog_TraditionalFileFormat
logTime, err := time.Parse(time.Stamp, "Oct 25 10:20:37")
if err != nil {
t.Fatal(err)
}
// parsing assumes log is from the current year
logTime = logTime.AddDate(time.Now().Year(), 0, 0).UTC()
logTime2, err := time.Parse(time.Stamp, "Apr 5 21:45:54")
if err != nil {
t.Fatal(err)
}
logTime2 = logTime2.AddDate(time.Now().Year(), 0, 0).UTC()
// timestamp in Rsyslog_FileFormat
logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00")
if err != nil {
t.Fatal(err)
}
logTime3 = logTime3.UTC()
specs := []Spec{
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat with simple log body",
Input: `Oct 25 10:20:37 some-host docker/fa3a5e338a47[1294]: log body`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime,
"hostname": "some-host",
"programname": "docker/fa3a5e338a47",
"rawlog": "log body",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat with haproxy access log body",
Input: `Apr 5 21:45:54 influx-service docker/0000aa112233[1234]: [httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime2,
"hostname": "influx-service",
"programname": "docker/0000aa112233",
"rawlog": "[httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_TraditionalFileFormat",
Input: `Apr 5 21:45:54 mongodb-some-machine whackanop: 2017/04/05 21:46:11 found 0 ops`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime2,
"hostname": "mongodb-some-machine",
"programname": "whackanop",
"rawlog": "2017/04/05 21:46:11 found 0 ops",
},
ExpectedError: nil,
},
Spec{
Title: "Parses Rsyslog_ FileFormat with Kayvee payload",
Input: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
},
ExpectedError: nil,
},
Spec{
Title: "Fails to parse non-RSyslog log line",
Input: `not rsyslog`,
ExpectedOutput: map[string]interface{}{},
ExpectedError: &syslogparser.ParserError{},
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := FieldsFromSyslog(spec.Input)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
type ParseAndEnhanceInput struct {
Line string
StringifyNested bool
RenameESReservedFields bool
MinimumTimestamp time.Time
}
type ParseAndEnhanceSpec struct {
Title string
Input ParseAndEnhanceInput
ExpectedOutput map[string]interface{}
ExpectedError error
}
func TestParseAndEnhance(t *testing.T) {
// timestamp in Rsyslog_FileFormat
logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00")
if err != nil {
t.Fatal(err)
}
logTime3 = logTime3.UTC()
specs := []ParseAndEnhanceSpec{
ParseAndEnhanceSpec{
Title: "Parses a Kayvee log line from an ECS app",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Parses a Kayvee log line from an ECS app, with override to container_app",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "force-app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Parses a non-Kayvee log line",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: some log`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `some log`,
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Fails to parse non-RSyslog log line",
Input: ParseAndEnhanceInput{Line: `not rsyslog`},
ExpectedOutput: map[string]interface{}{},
ExpectedError: &syslogparser.ParserError{},
},
ParseAndEnhanceSpec{
Title: "Parses JSON values",
Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": map[string]interface{}{"a": "b"},
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to stringify object values",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
StringifyNested: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": `{"a":"b"}`,
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to stringify array values",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`,
StringifyNested: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"nested": `[{"a":"b"}]`,
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Has the option to rename reserved ES fields",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Errors if logTime < MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{},
ExpectedError: fmt.Errorf(""),
},
ParseAndEnhanceSpec{
Title: "Accepts logs if logTime > MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
ParseAndEnhanceSpec{
Title: "Accepts logs if logTime > MinimumTimestamp",
Input: ParseAndEnhanceInput{
Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
RenameESReservedFields: true,
MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117
},
ExpectedOutput: map[string]interface{}{
"timestamp": logTime3,
"hostname": "ip-10-0-0-0",
"programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`,
"rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`,
"title": "request_finished",
"type": "Kayvee",
"prefix": "2017/04/05 21:57:46 some_file.go:10: ",
"postfix": "",
"env": "deploy-env",
"container_env": "env",
"container_app": "app",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
"kv__source": "a",
},
ExpectedError: nil,
},
}
for _, spec := range specs {
t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) {
assert := assert.New(t)
fields, err := ParseAndEnhance(spec.Input.Line, "deploy-env", spec.Input.StringifyNested, spec.Input.RenameESReservedFields, spec.Input.MinimumTimestamp)
if spec.ExpectedError != nil {
assert.Error(err)
assert.IsType(spec.ExpectedError, err)
} else {
assert.NoError(err)
}
assert.Equal(spec.ExpectedOutput, fields)
})
}
}
func TestGetContainerMeta(t *testing.T) {
assert := assert.New(t)
t.Log("Must have a programname to get container meta")
programname := ""
_, err := getContainerMeta(programname, "", "", "")
assert.Error(err)
t.Log("Can parse a programname")
programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err := getContainerMeta(programname, "", "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'env'")
overrideEnv := "force-env"
meta, err = getContainerMeta(programname, overrideEnv, "", "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": "app2",
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'app'")
overrideApp := "force-app"
meta, err = getContainerMeta(programname, "", overrideApp, "")
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": overrideApp,
"container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef",
}, meta)
t.Log("Can override just 'task'")
overrideTask := "force-task"
meta, err = getContainerMeta(programname, "", "", overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": "env1",
"container_app": "app2",
"container_task": overrideTask,
}, meta)
t.Log("Can override all fields")
programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`
meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask)
assert.NoError(err)
assert.Equal(map[string]string{
"container_env": overrideEnv,
"container_app": overrideApp,
"container_task": overrideTask,
}, meta)
}
func TestExtractKVMeta(t *testing.T) {
assert := assert.New(t)
tests := []struct {
Description string
Log map[string]interface{}
Team string
Language string
Version string
ExpectedMetricsRoutes []MetricsRoute
ExpectedAnalyticsRoutes []AnalyticsRoute
ExpectedNotificationRoutes []NotificationRoute
ExpectedAlertRoutes []AlertRoute
}{
{
Description: "log line with no routes",
Log: map[string]interface{}{"hi": "hello!"},
},
{
Description: "empty _kvmeta",
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{},
},
},
{
Description: "_kvmeta with no routes",
Team: "green",
Version: "three",
Language: "tree",
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "three",
"kv_language": "tree",
},
},
},
{
Description: "_kvmeta with metric routes",
Team: "green",
Version: "three",
Language: "tree",
ExpectedMetricsRoutes: []MetricsRoute{
{
Series: "1,1,2,3,5,8,13",
Dimensions: []string{"app", "district"},
ValueField: "value",
RuleName: "cool",
},
{
Series: "1,1,2,6,24,120,720,5040",
Dimensions: []string{"app", "district"},
ValueField: "value",
RuleName: "cool2",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "three",
"kv_language": "tree",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "metrics",
"rule": "cool",
"series": "1,1,2,3,5,8,13",
"value_field": "value",
"dimensions": []string{"app", "district"},
},
map[string]interface{}{
"type": "metrics",
"rule": "cool2",
"series": "1,1,2,6,24,120,720,5040",
"dimensions": []string{"app", "district"},
},
},
},
},
},
{
Description: "_kvmeta with analytic routes",
Team: "green",
Version: "christmas",
Language: "tree",
ExpectedAnalyticsRoutes: []AnalyticsRoute{
{
Series: "what-is-this",
RuleName: "what's-this?",
},
{
RuleName: "there's-app-invites-everywhere",
Series: "there's-bts-in-the-air",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "green",
"kv_version": "christmas",
"kv_language": "tree",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "analytics",
"rule": "what's-this?",
"series": "what-is-this",
},
map[string]interface{}{
"type": "analytics",
"rule": "there's-app-invites-everywhere",
"series": "there's-bts-in-the-air",
},
},
},
},
},
{
Description: "_kvmeta with notification routes",
Team: "slack",
Version: "evergreen",
Language: "markdown-ish",
ExpectedNotificationRoutes: []NotificationRoute{
{
RuleName: "did-you-know",
Channel: "originally-slack",
Message: "was a gaming company",
Icon: ":video_game:",
User: "og slack bronie",
},
{
RuleName: "what's-the-catch",
Channel: "slack-is-built-with-php",
Message: "just like farmville",
Icon: ":ghost:",
User: "logging-pipeline",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "slack",
"kv_version": "evergreen",
"kv_language": "markdown-ish",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "notifications",
"rule": "did-you-know",
"channel": "originally-slack",
"message": "was a gaming company",
"icon": ":video_game:",
"user": "og slack bronie",
},
map[string]interface{}{
"type": "notifications",
"rule": "what's-the-catch",
"channel": "slack-is-built-with-php",
"message": "just like farmville",
},
},
},
},
},
{
Description: "_kvmeta with alert routes",
Team: "a-team",
Version: "old",
Language: "jive",
ExpectedAlertRoutes: []AlertRoute{
{
RuleName: "last-call",
Series: "doing-it-til-we-fall",
Dimensions: []string{"who", "where"},
StatType: "guage",
ValueField: "status",
},
{
RuleName: "watch-out-now",
Series: "dem-gators-gonna-bite-ya",
Dimensions: []string{"how-fresh", "how-clean"},
StatType: "counter",
ValueField: "value",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "a-team",
"kv_version": "old",
"kv_language": "jive",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "alerts",
"rule": "last-call",
"series": "doing-it-til-we-fall",
"dimensions": []string{"who", "where"},
"stat_type": "guage",
"value_field": "status",
},
map[string]interface{}{
"type": "alerts",
"rule": "watch-out-now",
"series": "dem-gators-gonna-bite-ya",
"dimensions": []string{"how-fresh", "how-clean"},
},
},
},
},
},
{
Description: "_kvmeta with all types of routes",
Team: "diversity",
Version: "kv-routes",
Language: "understanding",
ExpectedMetricsRoutes: []MetricsRoute{
{
RuleName: "all-combos",
Series: "1,1,2,6,24,120,720,5040",
Dimensions: []string{"fact", "orial"},
ValueField: "value",
},
},
ExpectedAnalyticsRoutes: []AnalyticsRoute{
{
RuleName: "there's-app-invites-everywhere",
Series: "there's-bts-in-the-air",
},
},
ExpectedNotificationRoutes: []NotificationRoute{
{
RuleName: "what's-the-catch",
Channel: "slack-is-built-with-php",
Message: "just like farmville",
Icon: ":ghost:",
User: "logging-pipeline",
},
},
ExpectedAlertRoutes: []AlertRoute{
{
RuleName: "last-call",
Series: "doing-it-til-we-fall",
Dimensions: []string{"who", "where"},
StatType: "guage",
ValueField: "status",
},
},
Log: map[string]interface{}{
"hi": "hello!",
"_kvmeta": map[string]interface{}{
"team": "diversity",
"kv_version": "kv-routes",
"kv_language": "understanding",
"routes": []map[string]interface{}{
map[string]interface{}{
"type": "metrics",
"rule": "all-combos",
"series": "1,1,2,6,24,120,720,5040",
"dimensions": []string{"fact", "orial"},
},
map[string]interface{}{
"type": "analytics",
"rule": "there's-app-invites-everywhere",
"series": "there's-bts-in-the-air",
},
map[string]interface{}{
"type": "notifications",
"rule": "what's-the-catch",
"channel": "slack-is-built-with-php",
"message": "just like farmville",
},
map[string]interface{}{
"type": "alerts",
"rule": "last-call",
"series": "doing-it-til-we-fall",
"dimensions": []string{"who", "where"},
"stat_type": "guage",
"value_field": "status",
},
},
},
},
},
}
for _, test := range tests {
t.Log(test.Description)
kvmeta := ExtractKVMeta(test.Log)
assert.Equal(test.Team, kvmeta.Team)
assert.Equal(test.Language, kvmeta.Language)
assert.Equal(test.Version, kvmeta.Version)
assert.Equal(len(test.ExpectedMetricsRoutes), len(kvmeta.Routes.MetricsRoutes()))
for idx, route := range kvmeta.Routes.MetricsRoutes() {
expected := test.ExpectedMetricsRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedAnalyticsRoutes), len(kvmeta.Routes.AnalyticsRoutes()))
for idx, route := range kvmeta.Routes.AnalyticsRoutes() {
expected := test.ExpectedAnalyticsRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedNotificationRoutes), len(kvmeta.Routes.NotificationRoutes()))
for idx, route := range kvmeta.Routes.NotificationRoutes() {
expected := test.ExpectedNotificationRoutes[idx]
assert.Exactly(expected, route)
}
assert.Equal(len(test.ExpectedAlertRoutes), len(kvmeta.Routes.AlertRoutes()))
for idx, route := range kvmeta.Routes.AlertRoutes() {
expected := test.ExpectedAlertRoutes[idx]
assert.Exactly(expected, route)
}
}
}
// Benchmarks
const benchmarkLine = `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`
func BenchmarkFieldsFromKayvee(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := FieldsFromKayvee(benchmarkLine)
if err != nil {
b.FailNow()
}
}
}
func BenchmarkFieldsFromSyslog(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := FieldsFromSyslog(benchmarkLine)
if err != nil {
b.FailNow()
}
}
}
func BenchmarkParseAndEnhance(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := ParseAndEnhance(benchmarkLine, "env", false, false, time.Time{})
if err != nil {
b.FailNow()
}
}
}

53
glide.lock generated Normal file
View file

@ -0,0 +1,53 @@
hash: 6da9731518797a7e339144f126af589f72f860bee154b0f2a552f91d2bc01bab
updated: 2017-07-18T02:00:59.747262097Z
imports:
- name: github.com/aws/aws-sdk-go
version: b73b028e599fa9176687c70b8f9cafbe57c27d20
subpackages:
- aws
- aws/session
- service/firehose
- name: github.com/Clever/kinesis-to-firehose
version: dca69c87e1662c7b1f55581399544d05fdcb09ab
subpackages:
- writer
- name: github.com/Clever/syslogparser
version: 93ab95f7ff16c9ef1f2d09bc37c0a0c31bad98ea
subpackages:
- rfc3164
- name: github.com/jeromer/syslogparser
version: 0e4ae46ea3f08de351074b643d649d5d00661a3c
- name: github.com/xeipuuv/gojsonpointer
version: e0fe6f68307607d540ed8eac07a342c33fa1b54a
- name: github.com/xeipuuv/gojsonreference
version: e02fc20de94c78484cd5ffb007f8af96be030a45
- name: github.com/xeipuuv/gojsonschema
version: e18f0065e8c148fcf567ac43a3f8f5b66ac0720b
- name: golang.org/x/net
version: a6577fac2d73be281a500b310739095313165611
subpackages:
- context
- name: golang.org/x/time
version: f51c12702a4d776e4c1fa9b0fabab841babae631
subpackages:
- rate
- name: gopkg.in/Clever/kayvee-go.v6
version: 096364e316a52652d3493be702d8105d8d01db84
subpackages:
- logger
- router
- name: gopkg.in/yaml.v2
version: a5b47d31c556af34a302ce5d659e6fea44d90de0
testImports:
- name: github.com/davecgh/go-spew
version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9
subpackages:
- spew
- name: github.com/pmezard/go-difflib
version: d8ed2627bdf02c080bf22230dbb337003b7aba2d
subpackages:
- difflib
- name: github.com/stretchr/testify
version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0
subpackages:
- assert

22
glide.yaml Normal file
View file

@ -0,0 +1,22 @@
package: github.com/Clever/amazon-kinesis-client-go
import:
- package: github.com/Clever/kinesis-to-firehose
subpackages:
- writer
- package: github.com/Clever/syslogparser
subpackages:
- rfc3164
- package: github.com/aws/aws-sdk-go
subpackages:
- aws
- aws/session
- service/firehose
- package: golang.org/x/time
subpackages:
- rate
testImport:
- package: github.com/stretchr/testify
subpackages:
- assert
- package: gopkg.in/Clever/kayvee-go.v6
version: ^6.0.0

142
golang.mk Normal file
View file

@ -0,0 +1,142 @@
# This is the default Clever Golang Makefile.
# It is stored in the dev-handbook repo, github.com/Clever/dev-handbook
# Please do not alter this file directly.
GOLANG_MK_VERSION := 0.1.4
SHELL := /bin/bash
.PHONY: golang-godep-vendor golang-test-deps $(GODEP)
# if the gopath includes several directories, use only the first
GOPATH=$(shell echo $$GOPATH | cut -d: -f1)
# This block checks and confirms that the proper Go toolchain version is installed.
# arg1: golang version
define golang-version-check
GOVERSION := $(shell go version | grep $(1))
_ := $(if \
$(shell go version | grep $(1)), \
@echo "", \
$(error "must be running Go version $(1)"))
endef
export GO15VENDOREXPERIMENT=1
# FGT is a utility that exits with 1 whenever any stderr/stdout output is recieved.
FGT := $(GOPATH)/bin/fgt
$(FGT):
go get github.com/GeertJohan/fgt
# Godep is a tool used to manage Golang dependencies in the style of the Go 1.5
# vendoring experiment.
GODEP := $(GOPATH)/bin/godep
$(GODEP):
go get -u github.com/tools/godep
# Golint is a tool for linting Golang code for common errors.
GOLINT := $(GOPATH)/bin/golint
$(GOLINT):
go get github.com/golang/lint/golint
# golang-vendor-deps installs all dependencies needed for different test cases.
golang-godep-vendor-deps: $(GODEP)
# golang-godep-vendor is a target for saving dependencies with the godep tool
# to the vendor/ directory. All nested vendor/ directories are deleted as they
# are not handled well by the Go toolchain.
# arg1: pkg path
define golang-godep-vendor
$(GODEP) save $(1)
@# remove any nested vendor directories
find vendor/ -path '*/vendor' -type d | xargs -IX rm -r X
endef
# golang-fmt-deps requires the FGT tool for checking output
golang-fmt-deps: $(FGT)
# golang-fmt checks that all golang files in the pkg are formatted correctly.
# arg1: pkg path
define golang-fmt
@echo "FORMATTING $(1)..."
@$(FGT) gofmt -l=true $(GOPATH)/src/$(1)/*.go
endef
# golang-lint-deps requires the golint tool for golang linting.
golang-lint-deps: $(GOLINT)
# golang-lint calls golint on all golang files in the pkg.
# arg1: pkg path
define golang-lint
@echo "LINTING $(1)..."
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(GOLINT)
endef
# golang-lint-deps-strict requires the golint tool for golang linting.
golang-lint-deps-strict: $(GOLINT) $(FGT)
# golang-lint-strict calls golint on all golang files in the pkg and fails if any lint
# errors are found.
# arg1: pkg path
define golang-lint-strict
@echo "LINTING $(1)..."
@find $(GOPATH)/src/$(1)/*.go -type f | grep -v gen_ | xargs $(FGT) $(GOLINT)
endef
# golang-test-deps is here for consistency
golang-test-deps:
# golang-test uses the Go toolchain to run all tests in the pkg.
# arg1: pkg path
define golang-test
@echo "TESTING $(1)..."
@go test -v $(1)
endef
# golang-test-strict-deps is here for consistency
golang-test-strict-deps:
# golang-test-strict uses the Go toolchain to run all tests in the pkg with the race flag
# arg1: pkg path
define golang-test-strict
@echo "TESTING $(1)..."
@go test -v -race $(1)
endef
# golang-vet-deps is here for consistency
golang-vet-deps:
# golang-vet uses the Go toolchain to vet all the pkg for common mistakes.
# arg1: pkg path
define golang-vet
@echo "VETTING $(1)..."
@go vet $(GOPATH)/src/$(1)/*.go
endef
# golang-test-all-deps installs all dependencies needed for different test cases.
golang-test-all-deps: golang-fmt-deps golang-lint-deps golang-test-deps golang-vet-deps
# golang-test-all calls fmt, lint, vet and test on the specified pkg.
# arg1: pkg path
define golang-test-all
$(call golang-fmt,$(1))
$(call golang-lint,$(1))
$(call golang-vet,$(1))
$(call golang-test,$(1))
endef
# golang-test-all-strict-deps: installs all dependencies needed for different test cases.
golang-test-all-strict-deps: golang-fmt-deps golang-lint-deps-strict golang-test-strict-deps golang-vet-deps
# golang-test-all-strict calls fmt, lint, vet and test on the specified pkg with strict
# requirements that no errors are thrown while linting.
# arg1: pkg path
define golang-test-all-strict
$(call golang-fmt,$(1))
$(call golang-lint-strict,$(1))
$(call golang-vet,$(1))
$(call golang-test-strict,$(1))
endef
# golang-update-makefile downloads latest version of golang.mk
golang-update-makefile:
@wget https://raw.githubusercontent.com/Clever/dev-handbook/master/make/golang.mk -O /tmp/golang.mk 2>/dev/null
@if ! grep -q $(GOLANG_MK_VERSION) /tmp/golang.mk; then cp /tmp/golang.mk golang.mk && echo "golang.mk updated"; else echo "golang.mk is up-to-date"; fi

View file

@ -12,11 +12,17 @@ import (
)
type RecordProcessor interface {
Initialize(shardID string, checkpointer *Checkpointer) error
Initialize(shardID string, checkpointer Checkpointer) error
ProcessRecords(records []Record) error
Shutdown(reason string) error
}
type Checkpointer interface {
Checkpoint(sequenceNumber *string, subSequenceNumber *int) error
CheckpointWithRetry(sequenceNumber *string, subSequenceNumber *int, retryCount int) error
Shutdown()
}
type CheckpointError struct {
e string
}
@ -25,13 +31,13 @@ func (ce CheckpointError) Error() string {
return ce.e
}
type Checkpointer struct {
type checkpointer struct {
mux sync.Mutex
ioHandler ioHandler
}
func (c *Checkpointer) getAction() (interface{}, error) {
func (c *checkpointer) getAction() (interface{}, error) {
line, err := c.ioHandler.readLine()
if err != nil {
return nil, err
@ -43,7 +49,7 @@ func (c *Checkpointer) getAction() (interface{}, error) {
return action, nil
}
func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
func (c *checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
c.mux.Lock()
defer c.mux.Unlock()
@ -74,7 +80,7 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int
// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times.
// `retryCount` should be >= 0
func (c *Checkpointer) CheckpointWithRetry(
func (c *checkpointer) CheckpointWithRetry(
sequenceNumber *string, subSequenceNumber *int, retryCount int,
) error {
sleepDuration := 5 * time.Second
@ -108,7 +114,7 @@ func (c *Checkpointer) CheckpointWithRetry(
return nil
}
func (c *Checkpointer) Shutdown() {
func (c *checkpointer) Shutdown() {
c.CheckpointWithRetry(nil, nil, 5)
}
@ -225,7 +231,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R
}
return &KCLProcess{
ioHandler: i,
checkpointer: &Checkpointer{
checkpointer: &checkpointer{
ioHandler: i,
},
recordProcessor: recordProcessor,
@ -234,7 +240,7 @@ func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor R
type KCLProcess struct {
ioHandler ioHandler
checkpointer *Checkpointer
checkpointer Checkpointer
recordProcessor RecordProcessor
}
@ -278,11 +284,17 @@ func (kclp *KCLProcess) Run() {
for {
line, err := kclp.ioHandler.readLine()
if err != nil {
kclp.ioHandler.writeError(err.Error())
kclp.ioHandler.writeError("Read line error: " + err.Error())
return
} else if line == nil {
break
kclp.ioHandler.writeError("Empty read line recieved")
return
}
err = kclp.handleLine(line.String())
if err != nil {
kclp.ioHandler.writeError("Handle line error: " + err.Error())
return
}
kclp.handleLine(line.String())
}
}

10
splitter/README.md Normal file
View file

@ -0,0 +1,10 @@
splitter
===
This splitter allows ingesting logs from a CWLogs subscription.
Splitter's expected input is batched logs from a CloudWatchLogs subscription to a Kinesis Stream.
The CWLogs subscription has a special format which bundles several logs into a single record.
The splitter takes this record and splits it into multiple logs.
These logs are also modified to mimic the RSyslog format we expect from our other logs.
This allows them to be decoded normally by the rest of the pipeline.

113
splitter/splitter.go Normal file
View file

@ -0,0 +1,113 @@
package splitter
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
"time"
)
// LogEvent is a single log line within a LogEventBatch
type LogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
type LogEventBatch struct {
MessageType string `json:"messageType"`
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
LogEvents []LogEvent `json:"logEvents"`
}
// IsGzipped returns whether or not a string is Gzipped (determined by looking for a Gzip byte prefix)
func IsGzipped(b []byte) bool {
return b[0] == 0x1f && b[1] == 0x8b
}
// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into
// a slice of messages.
func GetMessagesFromGzippedInput(input []byte, prodEnv bool) ([][]byte, error) {
unpacked, err := unpack(input)
if err != nil {
return [][]byte{}, err
}
return Split(unpacked, prodEnv), nil
}
// Unpack expects a gzipped + json-stringified LogEventBatch
func unpack(input []byte) (LogEventBatch, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(input))
if err != nil {
return LogEventBatch{}, err
}
byt, err := ioutil.ReadAll(gzipReader)
if err != nil {
return LogEventBatch{}, err
}
var dat LogEventBatch
if err := json.Unmarshal(byt, &dat); err != nil {
return LogEventBatch{}, err
}
return dat, nil
}
// RFC3339Micro is the RFC3339 format in microseconds
const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\/` + // task-id
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // container-id
var taskRegex = regexp.MustCompile(taskMeta)
// Split takes a LogEventBatch and separates into a slice of enriched log lines
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
// the subsequent decoding logic.
func Split(b LogEventBatch, prodEnv bool) [][]byte {
env := "unknown"
app := "unknown"
task := "00001111-2222-3333-4444-555566667777"
matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][3]
}
if (env == "production") != prodEnv {
// if there's a mis-match between the consumer's environment and the log's environment,
// throw away the log. (this is a workaround for coarse grained subscription filters.)
return [][]byte{}
}
rsyslogPrefix := `%s %s %s[%d]: %s`
// programName is a mocked ARN in the format expected by our log decoders
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
mockPid := 1
hostname := "aws-batch"
out := [][]byte{}
for _, event := range b.LogEvents {
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond)
logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro)
// Fake an RSyslog prefix, expected by consumers
formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message)
out = append(out, []byte(formatted))
}
return out
}

182
splitter/splitter_test.go Normal file
View file

@ -0,0 +1,182 @@
package splitter
import (
"bytes"
"compress/gzip"
b64 "encoding/base64"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
func TestUnpacking(t *testing.T) {
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
decoded, err := b64.StdEncoding.DecodeString(input)
assert.NoError(t, err)
output, err := unpack(decoded)
assert.NoError(t, err)
expectedOutput := LogEventBatch{
MessageType: "CONTROL_MESSAGE",
Owner: "CloudwatchLogs",
LogGroup: "",
LogStream: "",
SubscriptionFilters: []string{},
LogEvents: []LogEvent{
{
ID: "",
Timestamp: 1498519943285,
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
},
},
}
assert.Equal(t, expectedOutput, output)
}
func pack(input LogEventBatch) (string, error) {
src, err := json.Marshal(input)
if err != nil {
return "", err
}
// Gzip
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(src); err != nil {
return "", err
}
if err := gz.Flush(); err != nil {
panic(err)
}
if err := gz.Close(); err != nil {
return "", err
}
// Base64 Encode
return b64.StdEncoding.EncodeToString([]byte(b.String())), nil
}
func TestFullLoop(t *testing.T) {
input := `{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "/aws/batch/job",
"logStream": "environment--app/11111111-2222-3333-4444-555566667777/88889999-0000-aaa-bbbb-ccccddddeeee",
"subscriptionFilters": [
"MySubscriptionFilter"
],
"logEvents": [
{
"id": "33418742379011144044923130086453437181614530551221780480",
"timestamp": 1498548236012,
"message": "some log line"
},
{
"id": "33418742387663833181953011865369295871402094815542181889",
"timestamp": 1498548236400,
"message": "2017/06/27 07:23:56 Another log line"
}
]
}`
var leb LogEventBatch
err := json.Unmarshal([]byte(input), &leb)
assert.NoError(t, err)
packed, err := pack(leb)
assert.NoError(t, err)
decoded, err := b64.StdEncoding.DecodeString(packed)
assert.NoError(t, err)
output, err := unpack(decoded)
assert.NoError(t, err)
assert.Equal(t, leb, output)
}
func TestSplit(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv := false
lines := Split(input, prodEnv)
expected := [][]byte{
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"),
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"),
}
assert.Equal(t, expected, lines)
}
func TestSplitFiltersByEnv(t *testing.T) {
t.Log("If Split is run with prodEnv == true, it should omit logs with env != production")
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
// LogStream: "environment--app",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv := true
lines := Split(input, prodEnv)
expected := [][]byte{}
assert.Equal(t, expected, lines)
t.Log("If Split is run with prodEnv == false, it should omit logs with env == production")
input = LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "production--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
// LogStream: "environment--app",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
prodEnv = false
lines = Split(input, prodEnv)
expected = [][]byte{}
assert.Equal(t, expected, lines)
}