diff --git a/Makefile b/Makefile index 082db71..42d8f1d 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,8 @@ EMPTY := SPACE := $(EMPTY) $(EMPTY) JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD)) +CONSUMER ?= consumer + $(JARS_TO_DOWNLOAD): mkdir -p `dirname $@` curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'` @@ -37,8 +39,13 @@ $(JARS_TO_DOWNLOAD): download_jars: $(JARS_TO_DOWNLOAD) build: - CGO_ENABLED=0 go build -installsuffix cgo -o build/consumer $(PKG)/cmd/consumer + CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER) run: build download_jars command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; } - java -cp $(JAVA_CLASS_PATH) com.amazonaws.services.kinesis.multilang.MultiLangDaemon consumer.properties + java -cp $(JAVA_CLASS_PATH) \ + com.amazonaws.services.kinesis.multilang.MultiLangDaemon \ + $(CONSUMER).properties + +bench: + go test -bench=. github.com/Clever/amazon-kinesis-client-go/decode/ diff --git a/batchconsumer.properties b/batchconsumer.properties new file mode 100644 index 0000000..8e718b8 --- /dev/null +++ b/batchconsumer.properties @@ -0,0 +1,83 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = build/batchconsumer + +# The name of an Amazon Kinesis stream to process. +streamName = test-stream + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = KCLGoExample + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = golang + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-west-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +#failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +#workerId = + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +#shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +#maxRecords = 10000 + +# Idle time between record reads in milliseconds. +#idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +#callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +#parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +#cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +#taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +#metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +#metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +#validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +#maxActiveThreads = 0 diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go new file mode 100644 index 0000000..6aa9a64 --- /dev/null +++ b/batchconsumer/batcher/message_batcher.go @@ -0,0 +1,165 @@ +package batcher + +import ( + "fmt" + "math/big" + "sync" + "time" +) + +type SequencePair struct { + Sequence *big.Int + SubSequence int +} + +// Sync is used to allow a writer to syncronize with the batcher. +// The writer declares how to write messages (via its `SendBatch` method), while the batcher +// keeps track of messages written +type Sync interface { + SendBatch(batch [][]byte) +} + +// Batcher interface +type Batcher interface { + // AddMesage to the batch + AddMessage(msg []byte, sequencePair SequencePair) error + // Flush all messages from the batch + Flush() + // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in + // the current batch + SmallestSequencePair() SequencePair +} + +type msgPack struct { + msg []byte + sequencePair SequencePair +} + +type batcher struct { + mux sync.Mutex + + flushInterval time.Duration + flushCount int + flushSize int + + // smallestSeq and smallestSubSeq are used to track the highest sequence number + // of any record in the batch. This is used for checkpointing. + smallestSeq SequencePair + + sync Sync + msgChan chan<- msgPack + flushChan chan<- struct{} +} + +// New creates a new Batcher +// - sync - synchronizes batcher with writer +// - flushInterval - how often accumulated messages should be flushed (default 1 second). +// - flushCount - number of messages that trigger a flush (default 10). +// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb) +func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) Batcher { + msgChan := make(chan msgPack, 100) + flushChan := make(chan struct{}) + + b := &batcher{ + flushCount: flushCount, + flushInterval: flushInterval, + flushSize: flushSize, + sync: sync, + msgChan: msgChan, + flushChan: flushChan, + } + + go b.startBatcher(msgChan, flushChan) + + return b +} + +func (b *batcher) SmallestSequencePair() SequencePair { + b.mux.Lock() + defer b.mux.Unlock() + + return b.smallestSeq +} + +func (b *batcher) SetFlushInterval(dur time.Duration) { + b.flushInterval = dur +} + +func (b *batcher) SetFlushCount(count int) { + b.flushCount = count +} + +func (b *batcher) SetFlushSize(size int) { + b.flushSize = size +} + +func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { + if len(msg) <= 0 { + return fmt.Errorf("Empty messages can't be sent") + } + + b.msgChan <- msgPack{msg, pair} + return nil +} + +// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. +// When flush() is called, the batcher sends the sequence number to the writer. When the writer +// checkpoints, it does so up to the latest message that was flushed successfully. +func (b *batcher) updateSequenceNumbers(pair SequencePair) { + b.mux.Lock() + defer b.mux.Unlock() + + isSmaller := b.smallestSeq.Sequence == nil || + pair.Sequence.Cmp(b.smallestSeq.Sequence) == -1 || + (pair.Sequence.Cmp(b.smallestSeq.Sequence) == 0 && + pair.SubSequence < b.smallestSeq.SubSequence) + if isSmaller { + b.smallestSeq = SequencePair{pair.Sequence, pair.SubSequence} + } +} + +func (b *batcher) Flush() { + b.flushChan <- struct{}{} +} + +func (b *batcher) batchSize(batch [][]byte) int { + total := 0 + for _, msg := range batch { + total += len(msg) + } + + return total +} + +func (b *batcher) flush(batch [][]byte) [][]byte { + if len(batch) > 0 { + b.sync.SendBatch(batch) + b.smallestSeq = SequencePair{nil, 0} + } + return [][]byte{} +} + +func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) { + batch := [][]byte{} + + for { + select { + case <-time.After(b.flushInterval): + batch = b.flush(batch) + case <-flushChan: + batch = b.flush(batch) + case pack := <-msgChan: + size := b.batchSize(batch) + if b.flushSize < size+len(pack.msg) { + batch = b.flush(batch) + } + + batch = append(batch, pack.msg) + b.updateSequenceNumbers(pack.sequencePair) + + if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) { + batch = b.flush(batch) + } + } + } +} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go new file mode 100644 index 0000000..488310d --- /dev/null +++ b/batchconsumer/batcher/message_batcher_test.go @@ -0,0 +1,212 @@ +package batcher + +import ( + "fmt" + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type batch [][]byte + +type MockSync struct { + flushChan chan struct{} + batches []batch +} + +func NewMockSync() *MockSync { + return &MockSync{ + flushChan: make(chan struct{}, 1), + batches: []batch{}, + } +} + +func (m *MockSync) SendBatch(b [][]byte) { + m.batches = append(m.batches, batch(b)) + m.flushChan <- struct{}{} +} + +func (m *MockSync) waitForFlush(timeout time.Duration) error { + select { + case <-m.flushChan: + return nil + case <-time.After(timeout): + return fmt.Errorf("timed out before flush (waited %s)", timeout.String()) + } +} + +const mockSequenceNumber = "99999" +const mockSubSequenceNumber = 12345 + +func TestBatchingByCount(t *testing.T) { + var err error + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Hour, 2, 1024*1024) + + t.Log("Batcher respect count limit") + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(1, len(sync.batches)) + assert.Equal(2, len(sync.batches[0])) + assert.Equal("hihi", string(sync.batches[0][0])) + assert.Equal("heyhey", string(sync.batches[0][1])) + + t.Log("Batcher doesn't send partial batches") + err = sync.waitForFlush(time.Millisecond * 10) + assert.Error(err) +} + +func TestBatchingByTime(t *testing.T) { + var err error + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Millisecond, 2000000, 1024*1024) + + t.Log("Batcher sends partial batches when time expires") + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(1, len(sync.batches)) + assert.Equal(1, len(sync.batches[0])) + assert.Equal("hihi", string(sync.batches[0][0])) + + t.Log("Batcher sends all messsages in partial batches when time expires") + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(2, len(sync.batches)) + assert.Equal(2, len(sync.batches[1])) + assert.Equal("heyhey", string(sync.batches[1][0])) + assert.Equal("yoyo", string(sync.batches[1][1])) + + t.Log("Batcher doesn't send empty batches") + err = sync.waitForFlush(time.Millisecond * 10) + assert.Error(err) +} + +func TestBatchingBySize(t *testing.T) { + var err error + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Hour, 2000000, 8) + + t.Log("Large messages are sent immediately") + assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(1, len(sync.batches)) + assert.Equal(1, len(sync.batches[0])) + assert.Equal("hellohello", string(sync.batches[0][0])) + + t.Log("Batcher tries not to exceed size limit") + assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequenceNumber, mockSubSequenceNumber)) + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(2, len(sync.batches)) + assert.Equal(1, len(sync.batches[1])) + assert.Equal("heyhey", string(sync.batches[1][0])) + + t.Log("Batcher sends messages that didn't fit in previous batch") + assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequenceNumber, mockSubSequenceNumber)) // At this point "hihi" is in the batch + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(3, len(sync.batches)) + assert.Equal(2, len(sync.batches[2])) + assert.Equal("hihi", string(sync.batches[2][0])) + assert.Equal("yoyo", string(sync.batches[2][1])) + + t.Log("Batcher doesn't send partial batches") + assert.NoError(batcher.AddMessage([]byte("okok"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.Error(err) +} + +func TestFlushing(t *testing.T) { + var err error + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Hour, 2000000, 1024*1024) + + t.Log("Calling flush sends pending messages") + assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequenceNumber, mockSubSequenceNumber)) + + err = sync.waitForFlush(time.Millisecond * 10) + assert.Error(err) + + batcher.Flush() + + err = sync.waitForFlush(time.Millisecond * 10) + assert.NoError(err) + + assert.Equal(1, len(sync.batches)) + assert.Equal(1, len(sync.batches[0])) + assert.Equal("hihi", string(sync.batches[0][0])) +} + +func TestSendingEmpty(t *testing.T) { + var err error + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Second, 10, 1024*1024) + + t.Log("An error is returned when an empty message is sent") + err = batcher.AddMessage([]byte{}, mockSequenceNumber, mockSubSequenceNumber) + assert.Error(err) +} + +func TestUpdatingSequence(t *testing.T) { + assert := assert.New(t) + + sync := NewMockSync() + batcher := New(sync, time.Second, 10, 1024*1024).(*batcher) + + t.Log("Initally, smallestSeq is undefined") + expected := new(big.Int) + assert.Nil(batcher.smallestSeq.Sequence) + + t.Log("After AddMessage (seq=1), smallestSeq = 1") + assert.NoError(batcher.AddMessage([]byte("abab"), "1", mockSubSequenceNumber)) + sync.waitForFlush(time.Minute) + expected.SetInt64(1) + seq := batcher.SmallestSequencePair() + assert.True(expected.Cmp(seq.Sequence) == 0) + + t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") + assert.NoError(batcher.AddMessage([]byte("cdcd"), "2", mockSubSequenceNumber)) + sync.waitForFlush(time.Minute) + seq = batcher.SmallestSequencePair() + assert.True(expected.Cmp(seq.Sequence) == 0) + + t.Log("After AddMessage (seq=1), smallestSeq = 0") + assert.NoError(batcher.AddMessage([]byte("efef"), "0", mockSubSequenceNumber)) + sync.waitForFlush(time.Minute) + expected.SetInt64(0) + seq = batcher.SmallestSequencePair() + assert.True(expected.Cmp(seq.Sequence) == 0) +} diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go new file mode 100644 index 0000000..5b47a2a --- /dev/null +++ b/batchconsumer/consumer.go @@ -0,0 +1,117 @@ +package batchconsumer + +import ( + "io" + "log" + "os" + "time" + + "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type Config struct { + // LogFile where consumer errors and failed log lines are saved + LogFile string + // FlushInterval is how often accumulated messages should be bulk put to firehose + FlushInterval time.Duration + // FlushCount is the number of messages that triggers a push to firehose. Max batch size is 500, see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html + FlushCount int + // FlushSize is the size of a batch in bytes that triggers a push to firehose. Max batch size is 4Mb (4*1024*1024), see: http://docs.aws.amazon.com/firehose/latest/dev/limits.html + FlushSize int + + // DeployEnv the name of the deployment enviornment + DeployEnv string + + // ReadRateLimit the number of records read per seconds + ReadRateLimit int + // ReadBurstLimit the max number of tokens allowed by rate limiter + ReadBurstLimit int + + // CheckpointFreq the frequence in which a checkpoint is saved + CheckpointFreq time.Duration + // CheckpointRetries the number of times the consumer will try to save a checkpoint + CheckpointRetries int + // CheckpointRetrySleep the amount of time between checkpoint save attempts + CheckpointRetrySleep time.Duration + + logOutput io.Writer +} + +type BatchConsumer struct { + kclProcess *kcl.KCLProcess +} + +func withDefaults(config Config) Config { + if config.LogFile == "" { + config.LogFile = "/tmp/kcl-" + time.Now().Format(time.RFC3339) + } + + if config.FlushInterval == 0 { + config.FlushInterval = 10 * time.Second + } + if config.FlushCount == 0 { + config.FlushCount = 500 + } + if config.FlushSize == 0 { + config.FlushSize = 4 * 1024 * 1024 + } + + if config.DeployEnv == "" { + config.DeployEnv = "unknown-env" + } + + if config.ReadRateLimit == 0 { + config.ReadRateLimit = 300 + } + if config.ReadRateLimit == 0 { + config.ReadRateLimit = int(300 * 1.2) + } + + if config.CheckpointFreq == 0 { + config.CheckpointFreq = 60 * time.Second + } + if config.CheckpointRetries == 0 { + config.CheckpointRetries = 5 + } + if config.CheckpointRetrySleep == 0 { + config.CheckpointRetrySleep = 5 * time.Second + } + + return config +} + +func NewBatchConsumerFromFiles( + config Config, sender Sender, input io.Reader, output, errFile io.Writer, +) *BatchConsumer { + config = withDefaults(config) + + file, err := os.OpenFile(config.LogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Unable to create log file: %s", err.Error()) + } + defer file.Close() + + kvlog := logger.New("amazon-kinesis-client-go") + kvlog.SetOutput(file) + + wrt := &BatchedWriter{ + config: config, + log: kvlog, + sender: sender, + } + kclProcess := kcl.New(input, output, errFile, wrt) + + return &BatchConsumer{ + kclProcess: kclProcess, + } +} + +func NewBatchConsumer(config Config, sender Sender) *BatchConsumer { + return NewBatchConsumerFromFiles(config, sender, os.Stdin, os.Stdout, os.Stderr) +} + +func (b *BatchConsumer) Start() { + b.kclProcess.Run() +} diff --git a/batchconsumer/sender.go b/batchconsumer/sender.go new file mode 100644 index 0000000..5ab6f53 --- /dev/null +++ b/batchconsumer/sender.go @@ -0,0 +1,30 @@ +package batchconsumer + +import ( + "errors" + "fmt" +) + +var ErrLogIgnored = errors.New("Log intentionally skipped by sender") + +type Sender interface { + EncodeLog(rawlog []byte) (log []byte, tags []string, err error) + SendBatch(batch [][]byte, tag string) error +} + +type PartialOutputError struct { + Message string + Logs [][]byte +} + +func (c PartialOutputError) Error() string { + return fmt.Sprintf("%d failed logs. %s", len(c.Logs), c.Message) +} + +type CatastrophicOutputError struct { + Message string +} + +func (c CatastrophicOutputError) Error() string { + return c.Message +} diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go new file mode 100644 index 0000000..cd77a59 --- /dev/null +++ b/batchconsumer/sync.go @@ -0,0 +1,11 @@ +package batchconsumer + +type BatcherSync struct { + tag string + writer *BatchedWriter +} + +func (b *BatcherSync) SendBatch(batch [][]byte) { + b.writer.SendBatch(batch, b.tag) + b.writer.CheckPointBatch(b.tag) +} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go new file mode 100644 index 0000000..17c129f --- /dev/null +++ b/batchconsumer/writer.go @@ -0,0 +1,250 @@ +package batchconsumer + +import ( + "context" + "encoding/base64" + "fmt" + "math/big" + "os" + "time" + + "golang.org/x/time/rate" + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher" + "github.com/Clever/amazon-kinesis-client-go/kcl" + "github.com/Clever/amazon-kinesis-client-go/splitter" +) + +type BatchedWriter struct { + config Config + sender Sender + log kv.KayveeLogger + + shardID string + checkpointChan chan batcher.SequencePair + + // Limits the number of records read from the stream + rateLimiter *rate.Limiter + + batchers map[string]batcher.Batcher + lastProcessedSeq batcher.SequencePair +} + +func (b *BatchedWriter) Initialize(shardID string, checkpointer *kcl.Checkpointer) error { + b.batchers = map[string]batcher.Batcher{} + b.shardID = shardID + b.checkpointChan = make(chan batcher.SequencePair) + b.rateLimiter = rate.NewLimiter(rate.Limit(b.config.ReadRateLimit), b.config.ReadBurstLimit) + + b.startCheckpointListener(checkpointer, b.checkpointChan) + + return nil +} + +// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise. +func (b *BatchedWriter) handleCheckpointError(err error) bool { + if err == nil { + return false + } + + cperr, ok := err.(kcl.CheckpointError) + if !ok { + b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID}) + return true + } + + switch cperr.Error() { + case "ShutdownException": // Skips checkpointing + b.log.ErrorD("shutdown-checkpoint-exception", kv.M{ + "msg": err.Error(), "shard-id": b.shardID, + }) + return false + case "ThrottlingException": + b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID}) + case "InvalidStateException": + b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID}) + default: + b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err}) + } + + return true +} + +func (b *BatchedWriter) startCheckpointListener( + checkpointer *kcl.Checkpointer, checkpointChan <-chan batcher.SequencePair, +) { + lastCheckpoint := time.Now() + + go func() { + for { + seq := <-checkpointChan + + // This is a write throttle to ensure we don't checkpoint faster than + // b.config.CheckpointFreq. The latest seq number is always used. + for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { + select { + case seq = <-checkpointChan: // Keep updating checkpoint seq while waiting + case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: + } + } + + retry := true + for n := 0; retry && n < b.config.CheckpointRetries+1; n++ { + str := seq.Sequence.String() + err := checkpointer.Checkpoint(&str, &seq.SubSequence) + if err == nil { // Successfully checkpointed! + lastCheckpoint = time.Now() + break + } + + retry = b.handleCheckpointError(err) + + if n == b.config.CheckpointRetries { + b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries}) + retry = false + } + + if retry { + time.Sleep(b.config.CheckpointRetrySleep) + } + } + } + }() +} + +func (b *BatchedWriter) createBatcher(tag string) batcher.Batcher { + sync := &BatcherSync{ + tag: tag, + writer: b, + } + return batcher.New(sync, b.config.FlushInterval, b.config.FlushCount, b.config.FlushSize) +} + +func (b *BatchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { + // We handle two types of records: + // - records emitted from CWLogs Subscription + // - records emiited from KPL + if !splitter.IsGzipped(record) { + // Process a single message, from KPL + return [][]byte{record}, nil + } + + // Process a batch of messages from a CWLogs Subscription + return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production") +} + +func (b *BatchedWriter) ProcessRecords(records []kcl.Record) error { + curSequence := b.lastProcessedSeq + + for _, record := range records { + // Wait until rate limiter permits one more record to be processed + b.rateLimiter.Wait(context.Background()) + + seq := new(big.Int) + if _, ok := seq.SetString(record.SequenceNumber, 10); !ok { // Validating sequence + return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) + } + + b.lastProcessedSeq = curSequence // Updated with the value from the previous iteration + curSequence = batcher.SequencePair{seq, record.SubSequenceNumber} + + data, err := base64.StdEncoding.DecodeString(record.Data) + if err != nil { + return err + } + + rawlogs, err := b.splitMessageIfNecessary(data) + if err != nil { + return err + } + for _, rawlog := range rawlogs { + log, tags, err := b.sender.EncodeLog(rawlog) + if err == ErrLogIgnored { + continue // Skip message + } else if err != nil { + return err + } + + if len(tags) == 0 { + return fmt.Errorf("No tags provided by consumer for log: %s", string(rawlog)) + } + + for _, tag := range tags { + batcher, ok := b.batchers[tag] + if !ok { + batcher = b.createBatcher(tag) + b.batchers[tag] = batcher + } + + // Use second to last sequence number to ensure we don't checkpoint a message before + // it's been sent. When batches are sent, conceptually we first find the smallest + // sequence number amount all the batch (let's call it A). We then checkpoint at + // the A-1 sequence number. + err = batcher.AddMessage(log, b.lastProcessedSeq) + if err != nil { + return err + } + } + } + } + + b.lastProcessedSeq = curSequence + + return nil +} + +func (b *BatchedWriter) CheckPointBatch(tag string) { + smallest := b.lastProcessedSeq + + for name, batch := range b.batchers { + if tag == name { + continue + } + + pair := batch.SmallestSequencePair() + if pair.Sequence == nil { // Occurs when batch has no items + continue + } + + isSmaller := smallest.Sequence == nil || // smallest.Sequence means batch just flushed + pair.Sequence.Cmp(smallest.Sequence) == -1 || + (pair.Sequence.Cmp(smallest.Sequence) == 0 && pair.SubSequence < smallest.SubSequence) + if isSmaller { + smallest = pair + } + } + + b.checkpointChan <- smallest +} + +func (b *BatchedWriter) SendBatch(batch [][]byte, tag string) { + b.log.Info("sent-batch") + err := b.sender.SendBatch(batch, tag) + switch e := err.(type) { + case nil: // Do nothing + case PartialOutputError: + b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + for _, line := range e.Logs { + b.log.ErrorD("failed-log", kv.M{"log": line}) + } + case CatastrophicOutputError: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + default: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + } +} + +func (b *BatchedWriter) Shutdown(reason string) error { + if reason == "TERMINATE" { + b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) + for _, batch := range b.batchers { + batch.Flush() + } + } else { + b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) + } + return nil +} diff --git a/circle.yml b/circle.yml index 26fbe3f..8f15e2b 100644 --- a/circle.yml +++ b/circle.yml @@ -12,6 +12,6 @@ compile: - make build test: override: - - echo TODO + - make test general: build_dir: ../.go_workspace/src/github.com/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go new file mode 100644 index 0000000..0d91b1f --- /dev/null +++ b/cmd/batchconsumer/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "log" + "os" + "time" + + "gopkg.in/Clever/kayvee-go.v6/logger" + + kbc "github.com/Clever/amazon-kinesis-client-go/batchconsumer" +) + +func createDummyOutput() (logger.KayveeLogger, *os.File) { + file, err := os.OpenFile("/tmp/example-kcl-output", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("Unable to create log file: %s", err.Error()) + } + + kvlog := logger.New("amazon-kinesis-client-go") + kvlog.SetOutput(file) + + return kvlog, file +} + +func main() { + config := kbc.Config{ + FlushInterval: 10 * time.Second, + FlushCount: 500, + FlushSize: 4 * 1024 * 1024, // 4Mb + LogFile: "/tmp/example-kcl-consumer", + DeployEnv: "test-env", + } + + output, file := createDummyOutput() + defer file.Close() + + wrt := &ExampleWriter{output: output} + consumer := kbc.NewBatchConsumer(config, wrt) + consumer.Start() +} + +type ExampleWriter struct { + output logger.KayveeLogger +} + +func (e *ExampleWriter) EncodeLog(rawlog []byte) ([]byte, []string, error) { + if len(rawlog)%5 == 2 { + return nil, nil, kbc.ErrLogIgnored + } + + tag1 := fmt.Sprintf("tag-%d", len(rawlog)%5) + line := tag1 + ": " + string(rawlog) + + return []byte(line), []string{tag1}, nil +} + +func (e *ExampleWriter) SendBatch(batch [][]byte, tag string) error { + for idx, line := range batch { + e.output.InfoD(tag, logger.M{"idx": idx, "line": string(line)}) + } + + return nil +} diff --git a/decode/decode.go b/decode/decode.go new file mode 100644 index 0000000..cfacd9d --- /dev/null +++ b/decode/decode.go @@ -0,0 +1,477 @@ +package decode + +import ( + "encoding/json" + "fmt" + "regexp" + "strings" + "time" + + "github.com/Clever/syslogparser/rfc3164" +) + +// reservedFields are automatically set during decoding. +// no field written by a user (e.g. contained in the Kayvee JSON) should overwrite them. +var reservedFields = []string{ + "prefix", + "postfix", + "Type", +} + +func stringInSlice(s string, slice []string) bool { + for _, item := range slice { + if s == item { + return true + } + } + return false +} + +// remapSyslog3164Keys renames fields to match our expecations from heka's syslog parser +// see: https://github.com/mozilla-services/heka/blob/278dd3d5961b9b6e47bb7a912b63ce3faaf8d8bd/sandbox/lua/decoders/rsyslog.lua +var remapSyslog3164Keys = map[string]string{ + "hostname": "hostname", + "timestamp": "timestamp", + "tag": "programname", + "content": "rawlog", +} + +// FieldsFromSyslog takes an RSyslog formatted log line and extracts fields from it +// +// Supports two log lines formats: +// - RSYSLOG_TraditionalFileFormat - the "old style" default log file format with low-precision timestamps (RFC3164) +// - RSYSLOG_FileFormat - a modern-style logfile format similar to TraditionalFileFormat, but with high-precision timestamps and timezone information +// +// For more details on Rsylog formats: https://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html +func FieldsFromSyslog(line string) (map[string]interface{}, error) { + // rfc3164 includes a severity number in front of the Syslog line, but we don't use that + fakeSeverity := "<12>" + p3164 := rfc3164.NewParser([]byte(fakeSeverity + line)) + err := p3164.Parse() + if err != nil { + return map[string]interface{}{}, err + } + + out := map[string]interface{}{} + for k, v := range p3164.Dump() { + if newKey, ok := remapSyslog3164Keys[k]; ok { + out[newKey] = v + } + } + return out, nil +} + +// NonKayveeError occurs when the log line is not Kayvee +type NonKayveeError struct{} + +func (e NonKayveeError) Error() string { + return fmt.Sprint("Log line is not Kayvee (doesn't have JSON payload)") +} + +// FieldsFromKayvee takes a log line and extracts fields from the Kayvee (JSON) part +func FieldsFromKayvee(line string) (map[string]interface{}, error) { + m := map[string]interface{}{} + + firstIdx := strings.Index(line, "{") + lastIdx := strings.LastIndex(line, "}") + if firstIdx == -1 || lastIdx == -1 || firstIdx > lastIdx { + return map[string]interface{}{}, &NonKayveeError{} + } + m["prefix"] = line[:firstIdx] + m["postfix"] = line[lastIdx+1:] + + possibleJSON := line[firstIdx : lastIdx+1] + var fields map[string]interface{} + if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil { + return map[string]interface{}{}, err + } + for k, v := range fields { + if !stringInSlice(k, reservedFields) { + m[k] = v + } + } + + m["type"] = "Kayvee" + + return m, nil +} + +// MetricsRoute represents a metrics kv log route +type MetricsRoute struct { + Series string + Dimensions []string + ValueField string + RuleName string +} + +// AnalyticsRoute represents an analytics kv log route +type AnalyticsRoute struct { + Series string + RuleName string +} + +// NotificationRoute represents a notification kv log route +type NotificationRoute struct { + Channel string + Icon string + Message string + User string + RuleName string +} + +// AlertRoute represents an alert kv log route +type AlertRoute struct { + Series string + Dimensions []string + StatType string + ValueField string + RuleName string +} + +func getStringValue(json map[string]interface{}, key string) string { + val, ok := json[key] + if !ok { + return "" + } + + str, ok := val.(string) + if !ok { + return "" + } + + return str +} + +func getStringArray(json map[string]interface{}, key string) []string { + val, ok := json[key] + if !ok { + return []string{} + } + + strArray, ok := val.([]string) + if !ok { + return []string{} + } + + return strArray +} + +// LogRoutes a type alias to make it easier to add route specific filter functions +type LogRoutes []map[string]interface{} + +// MetricsRoutes filters the LogRoutes and returns a list of MetricsRoutes structs +func (l LogRoutes) MetricsRoutes() []MetricsRoute { + routes := []MetricsRoute{} + + for _, route := range l { + tipe := getStringValue(route, "type") + if tipe != "metrics" { + continue + } + + series := getStringValue(route, "series") + dimensions := getStringArray(route, "dimensions") + valueField := getStringValue(route, "value_field") + ruleName := getStringValue(route, "rule") + + if series == "" { // TODO: log error + continue + } + if valueField == "" { + valueField = "value" + } + + routes = append(routes, MetricsRoute{ + Series: series, + Dimensions: dimensions, + ValueField: valueField, + RuleName: ruleName, + }) + } + + return routes +} + +// AnalyticsRoutes filters the LogRoutes and returns a list of AnalyticsRoutes structs +func (l LogRoutes) AnalyticsRoutes() []AnalyticsRoute { + routes := []AnalyticsRoute{} + + for _, route := range l { + tipe := getStringValue(route, "type") + if tipe != "analytics" { + continue + } + + series := getStringValue(route, "series") + ruleName := getStringValue(route, "rule") + + if series == "" { // TODO: log error + continue + } + + routes = append(routes, AnalyticsRoute{ + Series: series, + RuleName: ruleName, + }) + } + + return routes +} + +// NotificationRoutes filters the LogRoutes and returns a list of NotificationRoutes structs +func (l LogRoutes) NotificationRoutes() []NotificationRoute { + routes := []NotificationRoute{} + + for _, route := range l { + tipe := getStringValue(route, "type") + if tipe != "notifications" { + continue + } + + channel := getStringValue(route, "channel") + icon := getStringValue(route, "icon") + message := getStringValue(route, "message") + user := getStringValue(route, "user") + rule := getStringValue(route, "rule") + + if channel == "" || message == "" { // TODO: log error + continue + } + + if icon == "" { + icon = ":ghost:" + } + if user == "" { + user = "logging-pipeline" + } + + routes = append(routes, NotificationRoute{ + Channel: channel, + Icon: icon, + Message: message, + User: user, + RuleName: rule, + }) + } + + return routes +} + +// AlertRoutes filters the LogRoutes and returns a list of AlertRoutes structs +func (l LogRoutes) AlertRoutes() []AlertRoute { + routes := []AlertRoute{} + + for _, route := range l { + tipe := getStringValue(route, "type") + if tipe != "alerts" { + continue + } + + series := getStringValue(route, "series") + dimensions := getStringArray(route, "dimensions") + statType := getStringValue(route, "stat_type") + valueField := getStringValue(route, "value_field") + ruleName := getStringValue(route, "rule") + + if series == "" { // TODO: log error + continue + } + if statType == "" { + statType = "counter" + } + if valueField == "" { + valueField = "value" + } + + routes = append(routes, AlertRoute{ + Series: series, + Dimensions: dimensions, + StatType: statType, + ValueField: valueField, + RuleName: ruleName, + }) + } + + return routes +} + +// KVMeta a struct that represents kv-meta data +type KVMeta struct { + Team string + Version string + Language string + Routes LogRoutes +} + +// ExtractKVMeta returns a struct with available kv-meta data +func ExtractKVMeta(kvlog map[string]interface{}) KVMeta { + tmp, ok := kvlog["_kvmeta"] + if !ok { + return KVMeta{} + } + + kvmeta, ok := tmp.(map[string]interface{}) + if !ok { + return KVMeta{} + } + + kvRoutes := []map[string]interface{}{} + + tmp, ok = kvmeta["routes"] + if ok { + routes, ok := tmp.([]map[string]interface{}) + if ok { + kvRoutes = routes + } + } + + return KVMeta{ + Team: getStringValue(kvmeta, "team"), + Version: getStringValue(kvmeta, "kv_version"), + Language: getStringValue(kvmeta, "kv_language"), + Routes: kvRoutes, + } +} + +// ParseAndEnhance extracts fields from a log line, and does some post-processing to rename/add fields +func ParseAndEnhance(line string, env string, stringifyNested bool, renameESReservedFields bool, minimumTimestamp time.Time) (map[string]interface{}, error) { + out := map[string]interface{}{} + + syslogFields, err := FieldsFromSyslog(line) + if err != nil { + return map[string]interface{}{}, err + } + for k, v := range syslogFields { + out[k] = v + } + rawlog := syslogFields["rawlog"].(string) + programname := syslogFields["programname"].(string) + + // Try pulling Kayvee fields out of message + kvFields, err := FieldsFromKayvee(rawlog) + if err != nil { + if _, ok := err.(*NonKayveeError); !ok { + return map[string]interface{}{}, err + } + } else { + for k, v := range kvFields { + out[k] = v + } + } + + // Inject additional fields that are useful in log-searching and other business logic + out["env"] = env + + // Sometimes its useful to force `container_{env,app,task}`. A specific use-case is writing Docker events. + // A separate container monitors for start/stop events, but we set the container values in such a way that + // the logs for these events will appear in context for the app that the user is looking at instead of the + // docker-events app. + forceEnv := "" + forceApp := "" + forceTask := "" + if cEnv, ok := out["container_env"]; ok { + forceEnv = cEnv.(string) + } + if cApp, ok := out["container_app"]; ok { + forceApp = cApp.(string) + } + if cTask, ok := out["container_task"]; ok { + forceTask = cTask.(string) + } + meta, err := getContainerMeta(programname, forceEnv, forceApp, forceTask) + if err == nil { + for k, v := range meta { + out[k] = v + } + } + + // ES dynamic mappings get finnicky once you start sending nested objects. + // E.g., if one app sends a field for the first time as an object, then any log + // sent by another app containing that field /not/ as an object will fail. + // One solution is to decode nested objects as strings. + if stringifyNested { + for k, v := range out { + _, ismap := v.(map[string]interface{}) + _, isarr := v.([]interface{}) + if ismap || isarr { + bs, _ := json.Marshal(v) + out[k] = string(bs) + } + } + } + + // ES doesn't like fields that start with underscores. + if renameESReservedFields { + for oldKey, renamedKey := range esFieldRenames { + if val, ok := out[oldKey]; ok { + out[renamedKey] = val + delete(out, oldKey) + } + } + } + + msgTime, ok := out["timestamp"].(time.Time) + if ok && !msgTime.After(minimumTimestamp) { + return map[string]interface{}{}, fmt.Errorf("message's timestamp < minimumTimestamp") + } + + return out, nil +} + +var esFieldRenames = map[string]string{ + "_index": "kv__index", + "_uid": "kv__uid", + "_type": "kv__type", + "_id": "kv__id", + "_source": "kv__source", + "_size": "kv__size", + "_all": "kv__all", + "_field_names": "kv__field_names", + "_timestamp": "kv__timestamp", + "_ttl": "kv__ttl", + "_parent": "kv__parent", + "_routing": "kv__routing", + "_meta": "kv__meta", +} + +const containerMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app + `arn%3Aaws%3Aecs%3Aus-(west|east)-[1-2]%3A[0-9]{12}%3Atask%2F` + // ARN cruft + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id + +var containerMetaRegex = regexp.MustCompile(containerMeta) + +func getContainerMeta(programname, forceEnv, forceApp, forceTask string) (map[string]string, error) { + if programname == "" { + return map[string]string{}, fmt.Errorf("no programname") + } + + env := "" + app := "" + task := "" + matches := containerMetaRegex.FindAllStringSubmatch(programname, 1) + if len(matches) == 1 { + env = matches[0][1] + app = matches[0][2] + task = matches[0][4] + } + + if forceEnv != "" { + env = forceEnv + } + if forceApp != "" { + app = forceApp + } + if forceTask != "" { + task = forceTask + } + + if env == "" || app == "" || task == "" { + return map[string]string{}, fmt.Errorf("unable to get one or more of env/app/task") + } + + return map[string]string{ + "container_env": env, + "container_app": app, + "container_task": task, + }, nil +} diff --git a/decode/decode_test.go b/decode/decode_test.go new file mode 100644 index 0000000..9f7a123 --- /dev/null +++ b/decode/decode_test.go @@ -0,0 +1,834 @@ +package decode + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/Clever/syslogparser" + "github.com/stretchr/testify/assert" +) + +const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00" + +type Spec struct { + Title string + Input string + ExpectedOutput map[string]interface{} + ExpectedError error +} + +func TestKayveeDecoding(t *testing.T) { + specs := []Spec{ + Spec{ + Title: "handles just JSON", + Input: `{"a":"b"}`, + ExpectedOutput: map[string]interface{}{ + "prefix": "", + "postfix": "", + "a": "b", + "type": "Kayvee", + }, + ExpectedError: nil, + }, + Spec{ + Title: "handles prefix + JSON", + Input: `prefix {"a":"b"}`, + ExpectedOutput: map[string]interface{}{ + "prefix": "prefix ", + "postfix": "", + "a": "b", + "type": "Kayvee", + }, + ExpectedError: nil, + }, + Spec{ + Title: "handles JSON + postfix", + Input: `{"a":"b"} postfix`, + ExpectedOutput: map[string]interface{}{ + "prefix": "", + "postfix": " postfix", + "a": "b", + "type": "Kayvee", + }, + ExpectedError: nil, + }, + Spec{ + Title: "handles prefix + JSON + postfix", + Input: `prefix {"a":"b"} postfix`, + ExpectedOutput: map[string]interface{}{ + "prefix": "prefix ", + "postfix": " postfix", + "a": "b", + "type": "Kayvee", + }, + ExpectedError: nil, + }, + Spec{ + Title: "Returns NonKayveeError if not JSON in body", + Input: `prefix { postfix`, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &NonKayveeError{}, + }, + Spec{ + Title: "errors on invalid JSON (missing a quote)", + Input: `prefix {"a:"b"} postfix`, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &json.SyntaxError{}, + }, + } + + for _, spec := range specs { + t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) { + assert := assert.New(t) + fields, err := FieldsFromKayvee(spec.Input) + if spec.ExpectedError != nil { + assert.Error(err) + assert.IsType(spec.ExpectedError, err) + } else { + assert.NoError(err) + } + assert.Equal(spec.ExpectedOutput, fields) + }) + } +} + +func TestSyslogDecoding(t *testing.T) { + // timestamps in Rsyslog_TraditionalFileFormat + logTime, err := time.Parse(time.Stamp, "Oct 25 10:20:37") + if err != nil { + t.Fatal(err) + } + // parsing assumes log is from the current year + logTime = logTime.AddDate(time.Now().Year(), 0, 0).UTC() + + logTime2, err := time.Parse(time.Stamp, "Apr 5 21:45:54") + if err != nil { + t.Fatal(err) + } + logTime2 = logTime2.AddDate(time.Now().Year(), 0, 0).UTC() + + // timestamp in Rsyslog_FileFormat + logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00") + if err != nil { + t.Fatal(err) + } + logTime3 = logTime3.UTC() + + specs := []Spec{ + Spec{ + Title: "Parses Rsyslog_TraditionalFileFormat with simple log body", + Input: `Oct 25 10:20:37 some-host docker/fa3a5e338a47[1294]: log body`, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime, + "hostname": "some-host", + "programname": "docker/fa3a5e338a47", + "rawlog": "log body", + }, + ExpectedError: nil, + }, + Spec{ + Title: "Parses Rsyslog_TraditionalFileFormat with haproxy access log body", + Input: `Apr 5 21:45:54 influx-service docker/0000aa112233[1234]: [httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms`, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime2, + "hostname": "influx-service", + "programname": "docker/0000aa112233", + "rawlog": "[httpd] 2017/04/05 21:45:54 172.17.42.1 - heka [05/Apr/2017:21:45:54 +0000] POST /write?db=foo&precision=ms HTTP/1.1 204 0 - Go 1.1 package http 123456-1234-1234-b11b-000000000000 13.688672ms", + }, + ExpectedError: nil, + }, + Spec{ + Title: "Parses Rsyslog_TraditionalFileFormat", + Input: `Apr 5 21:45:54 mongodb-some-machine whackanop: 2017/04/05 21:46:11 found 0 ops`, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime2, + "hostname": "mongodb-some-machine", + "programname": "whackanop", + "rawlog": "2017/04/05 21:46:11 found 0 ops", + }, + ExpectedError: nil, + }, + Spec{ + Title: "Parses Rsyslog_ FileFormat with Kayvee payload", + Input: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`, + }, + ExpectedError: nil, + }, + Spec{ + Title: "Fails to parse non-RSyslog log line", + Input: `not rsyslog`, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &syslogparser.ParserError{}, + }, + } + for _, spec := range specs { + t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) { + assert := assert.New(t) + fields, err := FieldsFromSyslog(spec.Input) + if spec.ExpectedError != nil { + assert.Error(err) + assert.IsType(spec.ExpectedError, err) + } else { + assert.NoError(err) + } + assert.Equal(spec.ExpectedOutput, fields) + }) + } +} + +type ParseAndEnhanceInput struct { + Line string + StringifyNested bool + RenameESReservedFields bool + MinimumTimestamp time.Time +} + +type ParseAndEnhanceSpec struct { + Title string + Input ParseAndEnhanceInput + ExpectedOutput map[string]interface{} + ExpectedError error +} + +func TestParseAndEnhance(t *testing.T) { + // timestamp in Rsyslog_FileFormat + logTime3, err := time.Parse(RFC3339Micro, "2017-04-05T21:57:46.794862+00:00") + if err != nil { + t.Fatal(err) + } + logTime3 = logTime3.UTC() + + specs := []ParseAndEnhanceSpec{ + ParseAndEnhanceSpec{ + Title: "Parses a Kayvee log line from an ECS app", + Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`}, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Parses a Kayvee log line from an ECS app, with override to container_app", + Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`}, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished","container_app":"force-app"}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "force-app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Parses a non-Kayvee log line", + Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: some log`}, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `some log`, + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Fails to parse non-RSyslog log line", + Input: ParseAndEnhanceInput{Line: `not rsyslog`}, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &syslogparser.ParserError{}, + }, + ParseAndEnhanceSpec{ + Title: "Parses JSON values", + Input: ParseAndEnhanceInput{Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`}, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "nested": map[string]interface{}{"a": "b"}, + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Has the option to stringify object values", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`, + StringifyNested: true, + }, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": {"a":"b"}}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "nested": `{"a":"b"}`, + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Has the option to stringify array values", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`, + StringifyNested: true, + }, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "nested": [{"a":"b"}]}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "nested": `[{"a":"b"}]`, + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Has the option to rename reserved ES fields", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + RenameESReservedFields: true, + }, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "kv__source": "a", + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Errors if logTime < MinimumTimestamp", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + RenameESReservedFields: true, + MinimumTimestamp: time.Now().Add(100 * time.Hour * 24 * 365), // good thru year 2117 + }, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: fmt.Errorf(""), + }, + ParseAndEnhanceSpec{ + Title: "Accepts logs if logTime > MinimumTimestamp", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + RenameESReservedFields: true, + MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117 + }, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "kv__source": "a", + }, + ExpectedError: nil, + }, + ParseAndEnhanceSpec{ + Title: "Accepts logs if logTime > MinimumTimestamp", + Input: ParseAndEnhanceInput{ + Line: `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + RenameESReservedFields: true, + MinimumTimestamp: time.Now().Add(-100 * time.Hour * 24 * 365), // good thru year 2117 + }, + ExpectedOutput: map[string]interface{}{ + "timestamp": logTime3, + "hostname": "ip-10-0-0-0", + "programname": `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef`, + "rawlog": `2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished", "_source": "a"}`, + "title": "request_finished", + "type": "Kayvee", + "prefix": "2017/04/05 21:57:46 some_file.go:10: ", + "postfix": "", + "env": "deploy-env", + "container_env": "env", + "container_app": "app", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + "kv__source": "a", + }, + ExpectedError: nil, + }, + } + for _, spec := range specs { + t.Run(fmt.Sprintf(spec.Title), func(t *testing.T) { + assert := assert.New(t) + fields, err := ParseAndEnhance(spec.Input.Line, "deploy-env", spec.Input.StringifyNested, spec.Input.RenameESReservedFields, spec.Input.MinimumTimestamp) + if spec.ExpectedError != nil { + assert.Error(err) + assert.IsType(spec.ExpectedError, err) + } else { + assert.NoError(err) + } + assert.Equal(spec.ExpectedOutput, fields) + }) + } +} + +func TestGetContainerMeta(t *testing.T) { + assert := assert.New(t) + + t.Log("Must have a programname to get container meta") + programname := "" + _, err := getContainerMeta(programname, "", "", "") + assert.Error(err) + + t.Log("Can parse a programname") + programname = `env1--app2/arn%3Aaws%3Aecs%3Aus-west-1%3A589690932525%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef` + meta, err := getContainerMeta(programname, "", "", "") + assert.NoError(err) + assert.Equal(map[string]string{ + "container_env": "env1", + "container_app": "app2", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, meta) + + t.Log("Can override just 'env'") + overrideEnv := "force-env" + meta, err = getContainerMeta(programname, overrideEnv, "", "") + assert.NoError(err) + assert.Equal(map[string]string{ + "container_env": overrideEnv, + "container_app": "app2", + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, meta) + + t.Log("Can override just 'app'") + overrideApp := "force-app" + meta, err = getContainerMeta(programname, "", overrideApp, "") + assert.NoError(err) + assert.Equal(map[string]string{ + "container_env": "env1", + "container_app": overrideApp, + "container_task": "abcd1234-1a3b-1a3b-1234-d76552f4b7ef", + }, meta) + + t.Log("Can override just 'task'") + overrideTask := "force-task" + meta, err = getContainerMeta(programname, "", "", overrideTask) + assert.NoError(err) + assert.Equal(map[string]string{ + "container_env": "env1", + "container_app": "app2", + "container_task": overrideTask, + }, meta) + + t.Log("Can override all fields") + programname = `env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef` + meta, err = getContainerMeta(programname, overrideEnv, overrideApp, overrideTask) + assert.NoError(err) + assert.Equal(map[string]string{ + "container_env": overrideEnv, + "container_app": overrideApp, + "container_task": overrideTask, + }, meta) +} + +func TestExtractKVMeta(t *testing.T) { + assert := assert.New(t) + + tests := []struct { + Description string + Log map[string]interface{} + Team string + Language string + Version string + ExpectedMetricsRoutes []MetricsRoute + ExpectedAnalyticsRoutes []AnalyticsRoute + ExpectedNotificationRoutes []NotificationRoute + ExpectedAlertRoutes []AlertRoute + }{ + { + Description: "log line with no routes", + Log: map[string]interface{}{"hi": "hello!"}, + }, + { + Description: "empty _kvmeta", + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{}, + }, + }, + { + Description: "_kvmeta with no routes", + Team: "green", + Version: "three", + Language: "tree", + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "green", + "kv_version": "three", + "kv_language": "tree", + }, + }, + }, + { + Description: "_kvmeta with metric routes", + Team: "green", + Version: "three", + Language: "tree", + ExpectedMetricsRoutes: []MetricsRoute{ + { + Series: "1,1,2,3,5,8,13", + Dimensions: []string{"app", "district"}, + ValueField: "value", + RuleName: "cool", + }, + { + Series: "1,1,2,6,24,120,720,5040", + Dimensions: []string{"app", "district"}, + ValueField: "value", + RuleName: "cool2", + }, + }, + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "green", + "kv_version": "three", + "kv_language": "tree", + "routes": []map[string]interface{}{ + map[string]interface{}{ + "type": "metrics", + "rule": "cool", + "series": "1,1,2,3,5,8,13", + "value_field": "value", + "dimensions": []string{"app", "district"}, + }, + map[string]interface{}{ + "type": "metrics", + "rule": "cool2", + "series": "1,1,2,6,24,120,720,5040", + "dimensions": []string{"app", "district"}, + }, + }, + }, + }, + }, + { + Description: "_kvmeta with analytic routes", + Team: "green", + Version: "christmas", + Language: "tree", + ExpectedAnalyticsRoutes: []AnalyticsRoute{ + { + Series: "what-is-this", + RuleName: "what's-this?", + }, + { + RuleName: "there's-app-invites-everywhere", + Series: "there's-bts-in-the-air", + }, + }, + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "green", + "kv_version": "christmas", + "kv_language": "tree", + "routes": []map[string]interface{}{ + map[string]interface{}{ + "type": "analytics", + "rule": "what's-this?", + "series": "what-is-this", + }, + map[string]interface{}{ + "type": "analytics", + "rule": "there's-app-invites-everywhere", + "series": "there's-bts-in-the-air", + }, + }, + }, + }, + }, + { + Description: "_kvmeta with notification routes", + Team: "slack", + Version: "evergreen", + Language: "markdown-ish", + ExpectedNotificationRoutes: []NotificationRoute{ + { + RuleName: "did-you-know", + Channel: "originally-slack", + Message: "was a gaming company", + Icon: ":video_game:", + User: "og slack bronie", + }, + { + RuleName: "what's-the-catch", + Channel: "slack-is-built-with-php", + Message: "just like farmville", + Icon: ":ghost:", + User: "logging-pipeline", + }, + }, + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "slack", + "kv_version": "evergreen", + "kv_language": "markdown-ish", + "routes": []map[string]interface{}{ + map[string]interface{}{ + "type": "notifications", + "rule": "did-you-know", + "channel": "originally-slack", + "message": "was a gaming company", + "icon": ":video_game:", + "user": "og slack bronie", + }, + map[string]interface{}{ + "type": "notifications", + "rule": "what's-the-catch", + "channel": "slack-is-built-with-php", + "message": "just like farmville", + }, + }, + }, + }, + }, + { + Description: "_kvmeta with alert routes", + Team: "a-team", + Version: "old", + Language: "jive", + ExpectedAlertRoutes: []AlertRoute{ + { + RuleName: "last-call", + Series: "doing-it-til-we-fall", + Dimensions: []string{"who", "where"}, + StatType: "guage", + ValueField: "status", + }, + { + RuleName: "watch-out-now", + Series: "dem-gators-gonna-bite-ya", + Dimensions: []string{"how-fresh", "how-clean"}, + StatType: "counter", + ValueField: "value", + }, + }, + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "a-team", + "kv_version": "old", + "kv_language": "jive", + "routes": []map[string]interface{}{ + map[string]interface{}{ + "type": "alerts", + "rule": "last-call", + "series": "doing-it-til-we-fall", + "dimensions": []string{"who", "where"}, + "stat_type": "guage", + "value_field": "status", + }, + map[string]interface{}{ + "type": "alerts", + "rule": "watch-out-now", + "series": "dem-gators-gonna-bite-ya", + "dimensions": []string{"how-fresh", "how-clean"}, + }, + }, + }, + }, + }, + { + Description: "_kvmeta with all types of routes", + Team: "diversity", + Version: "kv-routes", + Language: "understanding", + ExpectedMetricsRoutes: []MetricsRoute{ + { + RuleName: "all-combos", + Series: "1,1,2,6,24,120,720,5040", + Dimensions: []string{"fact", "orial"}, + ValueField: "value", + }, + }, + ExpectedAnalyticsRoutes: []AnalyticsRoute{ + { + RuleName: "there's-app-invites-everywhere", + Series: "there's-bts-in-the-air", + }, + }, + ExpectedNotificationRoutes: []NotificationRoute{ + { + RuleName: "what's-the-catch", + Channel: "slack-is-built-with-php", + Message: "just like farmville", + Icon: ":ghost:", + User: "logging-pipeline", + }, + }, + ExpectedAlertRoutes: []AlertRoute{ + { + RuleName: "last-call", + Series: "doing-it-til-we-fall", + Dimensions: []string{"who", "where"}, + StatType: "guage", + ValueField: "status", + }, + }, + Log: map[string]interface{}{ + "hi": "hello!", + "_kvmeta": map[string]interface{}{ + "team": "diversity", + "kv_version": "kv-routes", + "kv_language": "understanding", + "routes": []map[string]interface{}{ + map[string]interface{}{ + "type": "metrics", + "rule": "all-combos", + "series": "1,1,2,6,24,120,720,5040", + "dimensions": []string{"fact", "orial"}, + }, + map[string]interface{}{ + "type": "analytics", + "rule": "there's-app-invites-everywhere", + "series": "there's-bts-in-the-air", + }, + map[string]interface{}{ + "type": "notifications", + "rule": "what's-the-catch", + "channel": "slack-is-built-with-php", + "message": "just like farmville", + }, + map[string]interface{}{ + "type": "alerts", + "rule": "last-call", + "series": "doing-it-til-we-fall", + "dimensions": []string{"who", "where"}, + "stat_type": "guage", + "value_field": "status", + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Log(test.Description) + kvmeta := ExtractKVMeta(test.Log) + + assert.Equal(test.Team, kvmeta.Team) + assert.Equal(test.Language, kvmeta.Language) + assert.Equal(test.Version, kvmeta.Version) + + assert.Equal(len(test.ExpectedMetricsRoutes), len(kvmeta.Routes.MetricsRoutes())) + for idx, route := range kvmeta.Routes.MetricsRoutes() { + expected := test.ExpectedMetricsRoutes[idx] + assert.Exactly(expected, route) + } + assert.Equal(len(test.ExpectedAnalyticsRoutes), len(kvmeta.Routes.AnalyticsRoutes())) + for idx, route := range kvmeta.Routes.AnalyticsRoutes() { + expected := test.ExpectedAnalyticsRoutes[idx] + assert.Exactly(expected, route) + } + assert.Equal(len(test.ExpectedNotificationRoutes), len(kvmeta.Routes.NotificationRoutes())) + for idx, route := range kvmeta.Routes.NotificationRoutes() { + expected := test.ExpectedNotificationRoutes[idx] + assert.Exactly(expected, route) + } + assert.Equal(len(test.ExpectedAlertRoutes), len(kvmeta.Routes.AlertRoutes())) + for idx, route := range kvmeta.Routes.AlertRoutes() { + expected := test.ExpectedAlertRoutes[idx] + assert.Exactly(expected, route) + } + } +} + +// Benchmarks +const benchmarkLine = `2017-04-05T21:57:46.794862+00:00 ip-10-0-0-0 env--app/arn%3Aaws%3Aecs%3Aus-west-1%3A999988887777%3Atask%2Fabcd1234-1a3b-1a3b-1234-d76552f4b7ef[3291]: 2017/04/05 21:57:46 some_file.go:10: {"title":"request_finished"}` + +func BenchmarkFieldsFromKayvee(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := FieldsFromKayvee(benchmarkLine) + if err != nil { + b.FailNow() + } + } +} + +func BenchmarkFieldsFromSyslog(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := FieldsFromSyslog(benchmarkLine) + if err != nil { + b.FailNow() + } + } +} + +func BenchmarkParseAndEnhance(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := ParseAndEnhance(benchmarkLine, "env", false, false, time.Time{}) + if err != nil { + b.FailNow() + } + } +} diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..a8f507f --- /dev/null +++ b/glide.lock @@ -0,0 +1,53 @@ +hash: 6da9731518797a7e339144f126af589f72f860bee154b0f2a552f91d2bc01bab +updated: 2017-07-18T02:00:59.747262097Z +imports: +- name: github.com/aws/aws-sdk-go + version: b73b028e599fa9176687c70b8f9cafbe57c27d20 + subpackages: + - aws + - aws/session + - service/firehose +- name: github.com/Clever/kinesis-to-firehose + version: dca69c87e1662c7b1f55581399544d05fdcb09ab + subpackages: + - writer +- name: github.com/Clever/syslogparser + version: 93ab95f7ff16c9ef1f2d09bc37c0a0c31bad98ea + subpackages: + - rfc3164 +- name: github.com/jeromer/syslogparser + version: 0e4ae46ea3f08de351074b643d649d5d00661a3c +- name: github.com/xeipuuv/gojsonpointer + version: e0fe6f68307607d540ed8eac07a342c33fa1b54a +- name: github.com/xeipuuv/gojsonreference + version: e02fc20de94c78484cd5ffb007f8af96be030a45 +- name: github.com/xeipuuv/gojsonschema + version: e18f0065e8c148fcf567ac43a3f8f5b66ac0720b +- name: golang.org/x/net + version: a6577fac2d73be281a500b310739095313165611 + subpackages: + - context +- name: golang.org/x/time + version: f51c12702a4d776e4c1fa9b0fabab841babae631 + subpackages: + - rate +- name: gopkg.in/Clever/kayvee-go.v6 + version: 096364e316a52652d3493be702d8105d8d01db84 + subpackages: + - logger + - router +- name: gopkg.in/yaml.v2 + version: a5b47d31c556af34a302ce5d659e6fea44d90de0 +testImports: +- name: github.com/davecgh/go-spew + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/testify + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 + subpackages: + - assert diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..a96cd81 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,20 @@ +package: github.com/Clever/amazon-kinesis-client-go +import: +- package: github.com/Clever/kinesis-to-firehose + subpackages: + - writer +- package: github.com/Clever/syslogparser + subpackages: + - rfc3164 +- package: github.com/aws/aws-sdk-go + subpackages: + - aws + - aws/session + - service/firehose +- package: golang.org/x/time + subpackages: + - rate +testImport: +- package: github.com/stretchr/testify + subpackages: + - assert diff --git a/splitter/README.md b/splitter/README.md new file mode 100644 index 0000000..bdc307e --- /dev/null +++ b/splitter/README.md @@ -0,0 +1,10 @@ +splitter +=== + +This splitter allows ingesting logs from a CWLogs subscription. + +Splitter's expected input is batched logs from a CloudWatchLogs subscription to a Kinesis Stream. +The CWLogs subscription has a special format which bundles several logs into a single record. +The splitter takes this record and splits it into multiple logs. +These logs are also modified to mimic the RSyslog format we expect from our other logs. +This allows them to be decoded normally by the rest of the pipeline. diff --git a/splitter/splitter.go b/splitter/splitter.go new file mode 100644 index 0000000..d43bd93 --- /dev/null +++ b/splitter/splitter.go @@ -0,0 +1,113 @@ +package splitter + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io/ioutil" + "regexp" + "time" +) + +// LogEvent is a single log line within a LogEventBatch +type LogEvent struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + Message string `json:"message"` +} + +// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription +type LogEventBatch struct { + MessageType string `json:"messageType"` + Owner string `json:"owner"` + LogGroup string `json:"logGroup"` + LogStream string `json:"logStream"` + SubscriptionFilters []string `json:"subscriptionFilters"` + LogEvents []LogEvent `json:"logEvents"` +} + +// IsGzipped returns whether or not a string is Gzipped (determined by looking for a Gzip byte prefix) +func IsGzipped(b []byte) bool { + return b[0] == 0x1f && b[1] == 0x8b +} + +// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into +// a slice of messages. +func GetMessagesFromGzippedInput(input []byte, prodEnv bool) ([][]byte, error) { + unpacked, err := unpack(input) + if err != nil { + return [][]byte{}, err + } + return Split(unpacked, prodEnv), nil +} + +// Unpack expects a gzipped + json-stringified LogEventBatch +func unpack(input []byte) (LogEventBatch, error) { + gzipReader, err := gzip.NewReader(bytes.NewReader(input)) + if err != nil { + return LogEventBatch{}, err + } + + byt, err := ioutil.ReadAll(gzipReader) + if err != nil { + return LogEventBatch{}, err + } + + var dat LogEventBatch + if err := json.Unmarshal(byt, &dat); err != nil { + return LogEventBatch{}, err + } + + return dat, nil +} + +// RFC3339Micro is the RFC3339 format in microseconds +const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00" + +const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\/` + // task-id + `([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // container-id + +var taskRegex = regexp.MustCompile(taskMeta) + +// Split takes a LogEventBatch and separates into a slice of enriched log lines +// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by +// the subsequent decoding logic. +func Split(b LogEventBatch, prodEnv bool) [][]byte { + env := "unknown" + app := "unknown" + task := "00001111-2222-3333-4444-555566667777" + matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1) + if len(matches) == 1 { + env = matches[0][1] + app = matches[0][2] + task = matches[0][3] + } + + if (env == "production") != prodEnv { + // if there's a mis-match between the consumer's environment and the log's environment, + // throw away the log. (this is a workaround for coarse grained subscription filters.) + return [][]byte{} + } + + rsyslogPrefix := `%s %s %s[%d]: %s` + // programName is a mocked ARN in the format expected by our log decoders + programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task + mockPid := 1 + hostname := "aws-batch" + + out := [][]byte{} + for _, event := range b.LogEvents { + // Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format. + // Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472) + nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond) + logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro) + + // Fake an RSyslog prefix, expected by consumers + formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message) + out = append(out, []byte(formatted)) + } + + return out +} diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go new file mode 100644 index 0000000..f85ebfa --- /dev/null +++ b/splitter/splitter_test.go @@ -0,0 +1,182 @@ +package splitter + +import ( + "bytes" + "compress/gzip" + b64 "encoding/base64" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUnpacking(t *testing.T) { + input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA" + + decoded, err := b64.StdEncoding.DecodeString(input) + assert.NoError(t, err) + + output, err := unpack(string(decoded)) + assert.NoError(t, err) + + expectedOutput := LogEventBatch{ + MessageType: "CONTROL_MESSAGE", + Owner: "CloudwatchLogs", + LogGroup: "", + LogStream: "", + SubscriptionFilters: []string{}, + LogEvents: []LogEvent{ + { + ID: "", + Timestamp: 1498519943285, + Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.", + }, + }, + } + assert.Equal(t, expectedOutput, output) +} + +func pack(input LogEventBatch) (string, error) { + src, err := json.Marshal(input) + if err != nil { + return "", err + } + + // Gzip + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(src); err != nil { + return "", err + } + if err := gz.Flush(); err != nil { + panic(err) + } + if err := gz.Close(); err != nil { + return "", err + } + + // Base64 Encode + return b64.StdEncoding.EncodeToString([]byte(b.String())), nil +} + +func TestFullLoop(t *testing.T) { + input := `{ + "messageType": "DATA_MESSAGE", + "owner": "123456789012", + "logGroup": "/aws/batch/job", + "logStream": "environment--app/11111111-2222-3333-4444-555566667777/88889999-0000-aaa-bbbb-ccccddddeeee", + "subscriptionFilters": [ + "MySubscriptionFilter" + ], + "logEvents": [ + { + "id": "33418742379011144044923130086453437181614530551221780480", + "timestamp": 1498548236012, + "message": "some log line" + }, + { + "id": "33418742387663833181953011865369295871402094815542181889", + "timestamp": 1498548236400, + "message": "2017/06/27 07:23:56 Another log line" + } + ] +}` + + var leb LogEventBatch + err := json.Unmarshal([]byte(input), &leb) + assert.NoError(t, err) + + packed, err := pack(leb) + assert.NoError(t, err) + + decoded, err := b64.StdEncoding.DecodeString(packed) + assert.NoError(t, err) + + output, err := unpack(string(decoded)) + assert.NoError(t, err) + + assert.Equal(t, leb, output) +} + +func TestSplit(t *testing.T) { + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/batch/job", + LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee", + SubscriptionFilters: []string{"MySubscriptionFilter"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: 1498519943285, + Message: "some log line", + }, + { + ID: "99999992387663833181953011865369295871402094815542181889", + Timestamp: 1498519943285, + Message: "another log line", + }, + }, + } + prodEnv := false + lines := Split(input, prodEnv) + expected := [][]byte{ + "2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line", + "2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line", + } + assert.Equal(t, expected, lines) +} + +func TestSplitFiltersByEnv(t *testing.T) { + t.Log("If Split is run with prodEnv == true, it should omit logs with env != production") + input := LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/batch/job", + LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee", + // LogStream: "environment--app", + SubscriptionFilters: []string{"MySubscriptionFilter"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: 1498519943285, + Message: "some log line", + }, + { + ID: "99999992387663833181953011865369295871402094815542181889", + Timestamp: 1498519943285, + Message: "another log line", + }, + }, + } + prodEnv := true + lines := Split(input, prodEnv) + expected := [][]byte{} + assert.Equal(t, expected, lines) + + t.Log("If Split is run with prodEnv == false, it should omit logs with env == production") + input = LogEventBatch{ + MessageType: "DATA_MESSAGE", + Owner: "123456789012", + LogGroup: "/aws/batch/job", + LogStream: "production--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee", + // LogStream: "environment--app", + SubscriptionFilters: []string{"MySubscriptionFilter"}, + LogEvents: []LogEvent{ + { + ID: "99999992379011144044923130086453437181614530551221780480", + Timestamp: 1498519943285, + Message: "some log line", + }, + { + ID: "99999992387663833181953011865369295871402094815542181889", + Timestamp: 1498519943285, + Message: "another log line", + }, + }, + } + prodEnv = false + lines = Split(input, prodEnv) + expected = [][]byte{} + assert.Equal(t, expected, lines) +}