diff --git a/Makefile b/Makefile index 7b6fb74..2549e7f 100644 --- a/Makefile +++ b/Makefile @@ -2,52 +2,52 @@ include golang.mk .DEFAULT_GOAL := test # override default goal set in library makefile SHELL := /bin/bash -JAR_DIR := jars PKG := github.com/Clever/amazon-kinesis-client-go PKGS := $(shell go list ./... | grep -v /vendor ) .PHONY: download_jars run build - -URL_PREFIX := http://search.maven.org/remotecontent?filepath= - -# this list lifted from https://github.com/awslabs/amazon-kinesis-client-python/blob/fb49c6390c0593fbcf81d6c34c5245726c15b2f3/setup.py#L60 -JARS_TO_DOWNLOAD := $(addprefix $(JAR_DIR)/,com/amazonaws/amazon-kinesis-client/1.7.2/amazon-kinesis-client-1.7.2.jar \ -com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar \ -com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar \ -com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar \ -com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar \ -commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \ -org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar \ -org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar \ -commons-codec/commons-codec/1.9/commons-codec-1.9.jar \ -com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar \ -com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar \ -com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar \ -com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar \ -joda-time/joda-time/2.8.1/joda-time-2.8.1.jar \ -com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar \ -com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar \ -com/google/guava/guava/18.0/guava-18.0.jar \ -com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar \ -commons-lang/commons-lang/2.6/commons-lang-2.6.jar) - -EMPTY := -SPACE := $(EMPTY) $(EMPTY) -JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD)) +$(eval $(call golang-version-check,1.8)) CONSUMER ?= consumer +TMP_DIR := ./tmp-jars +JAR_DIR := ./jars/ +KCL_VERSION := 1.7.6 -$(JARS_TO_DOWNLOAD): - mkdir -p `dirname $@` - curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'` +define POM_XML_FOR_GETTING_DEPENDENT_JARS + + 4.0.0 + com.clever.kinesisconsumers + $(CONSUMER) + 1.0-SNAPSHOT + + + com.amazonaws + amazon-kinesis-client + $(KCL_VERSION) + + + +endef +export POM_XML_FOR_GETTING_DEPENDENT_JARS +download_jars: + command -v mvn >/dev/null 2>&1 || { echo >&2 "Maven not installed. Install maven!"; exit 1; } + mkdir -p $(JAR_DIR) $(TMP_DIR) + echo $$POM_XML_FOR_GETTING_DEPENDENT_JARS > $(TMP_DIR)/pom.xml + cd $(TMP_DIR) && mvn dependency:copy-dependencies + mv $(TMP_DIR)/target/dependency/* $(JAR_DIR)/ + # Download the STS jar file for supporting IAM Roles + ls $(JAR_DIR)/aws-java-sdk-core-*.jar | sed -e "s/.*-sdk-core-//g" | sed -e "s/\.jar//g" > /tmp/version.txt + curl -o $(JAR_DIR)/aws-java-sdk-sts-`cat /tmp/version.txt`.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/`cat /tmp/version.txt`/aws-java-sdk-sts-`cat /tmp/version.txt`.jar + rm -r $(TMP_DIR) -download_jars: $(JARS_TO_DOWNLOAD) +all: test build build: CGO_ENABLED=0 go build -installsuffix cgo -o build/$(CONSUMER) $(PKG)/cmd/$(CONSUMER) run: build download_jars command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; } - java -cp $(JAVA_CLASS_PATH) \ + java -cp "$(JAR_DIR)/*" \ com.amazonaws.services.kinesis.multilang.MultiLangDaemon \ $(CONSUMER).properties diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go new file mode 100644 index 0000000..2d8480b --- /dev/null +++ b/batchconsumer/batcher.go @@ -0,0 +1,53 @@ +package batchconsumer + +import ( + "errors" + "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +var ErrBatchFull = errors.New("The batch is full") + +type batcher struct { + flushCount int + flushSize int + + Batch [][]byte + LastUpdated time.Time + SmallestSeq kcl.SequencePair +} + +func (b *batcher) batchSize(batch [][]byte) int { + total := 0 + for _, msg := range batch { + total += len(msg) + } + + return total +} + +func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { + if b.flushCount <= len(b.Batch) { + return ErrBatchFull + } + + size := b.batchSize(b.Batch) + if b.flushSize < size+len(msg) { + return ErrBatchFull + } + + b.Batch = append(b.Batch, msg) + if b.SmallestSeq.IsNil() || pair.IsLessThan(b.SmallestSeq) { + b.SmallestSeq = pair + } + b.LastUpdated = time.Now() + + return nil +} + +func (b *batcher) Clear() { + b.Batch = [][]byte{} + b.LastUpdated = time.Time{} + b.SmallestSeq = kcl.SequencePair{} +} diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go deleted file mode 100644 index 82245e8..0000000 --- a/batchconsumer/batcher/message_batcher.go +++ /dev/null @@ -1,194 +0,0 @@ -package batcher - -import ( - "fmt" - "math/big" - "sync" - "time" -) - -// SequencePair a convience way to pass around a Sequence / SubSequence pair -type SequencePair struct { - Sequence *big.Int - SubSequence int -} - -func (s SequencePair) IsEmpty() bool { - return s.Sequence == nil -} - -func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable - return false - } - - cmp := s.Sequence.Cmp(pair.Sequence) - if cmp == -1 { - return true - } - if cmp == 1 { - return false - } - - return s.SubSequence < pair.SubSequence -} - -// Sync is used to allow a writer to syncronize with the batcher. -// The writer declares how to write messages (via its `SendBatch` method), while the batcher -// keeps track of messages written -type Sync interface { - SendBatch(batch [][]byte) -} - -// Batcher interface -type Batcher interface { - // AddMesage to the batch - AddMessage(msg []byte, sequencePair SequencePair) error - // Flush all messages from the batch - Flush() - // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in - // the current batch - SmallestSequencePair() SequencePair -} - -type msgPack struct { - msg []byte - sequencePair SequencePair -} - -type batcher struct { - mux sync.Mutex - - flushInterval time.Duration - flushCount int - flushSize int - - // smallestSeq are used for checkpointing - smallestSeq SequencePair - - sync Sync - msgChan chan<- msgPack - flushChan chan<- struct{} -} - -// New creates a new Batcher -// - sync - synchronizes batcher with writer -// - flushInterval - how often accumulated messages should be flushed (default 1 second). -// - flushCount - number of messages that trigger a flush (default 10). -// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb) -func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) { - if flushSize == 0 { - return nil, fmt.Errorf("flush size must be non-zero") - } - if flushCount == 0 { - return nil, fmt.Errorf("flush count must be non-zero") - } - if flushInterval == 0 { - return nil, fmt.Errorf("flush interval must be non-zero") - } - - msgChan := make(chan msgPack) - flushChan := make(chan struct{}) - - b := &batcher{ - flushCount: flushCount, - flushInterval: flushInterval, - flushSize: flushSize, - sync: sync, - msgChan: msgChan, - flushChan: flushChan, - } - - go b.startBatcher(msgChan, flushChan) - - return b, nil -} - -func (b *batcher) SmallestSequencePair() SequencePair { - b.mux.Lock() - defer b.mux.Unlock() - - return b.smallestSeq -} - -func (b *batcher) SetFlushInterval(dur time.Duration) { - b.flushInterval = dur -} - -func (b *batcher) SetFlushCount(count int) { - b.flushCount = count -} - -func (b *batcher) SetFlushSize(size int) { - b.flushSize = size -} - -func (b *batcher) AddMessage(msg []byte, pair SequencePair) error { - if len(msg) <= 0 { - return fmt.Errorf("Empty messages can't be sent") - } - - b.msgChan <- msgPack{msg, pair} - return nil -} - -// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. -// When flush() is called, the batcher sends the sequence number to the writer. When the writer -// checkpoints, it does so up to the latest message that was flushed successfully. -func (b *batcher) updateSequenceNumbers(pair SequencePair) { - b.mux.Lock() - defer b.mux.Unlock() - - if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) { - b.smallestSeq = pair - } -} - -func (b *batcher) Flush() { - b.flushChan <- struct{}{} -} - -func (b *batcher) batchSize(batch [][]byte) int { - total := 0 - for _, msg := range batch { - total += len(msg) - } - - return total -} - -func (b *batcher) flush(batch [][]byte) [][]byte { - if len(batch) > 0 { - b.sync.SendBatch(batch) - - b.mux.Lock() - b.smallestSeq = SequencePair{nil, 0} - b.mux.Unlock() - } - return [][]byte{} -} - -func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) { - batch := [][]byte{} - - for { - select { - case <-time.After(b.flushInterval): - batch = b.flush(batch) - case <-flushChan: - batch = b.flush(batch) - case pack := <-msgChan: - size := b.batchSize(batch) - if b.flushSize < size+len(pack.msg) { - batch = b.flush(batch) - } - - batch = append(batch, pack.msg) - b.updateSequenceNumbers(pack.sequencePair) - - if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) { - batch = b.flush(batch) - } - } - } -} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go deleted file mode 100644 index 8ab9039..0000000 --- a/batchconsumer/batcher/message_batcher_test.go +++ /dev/null @@ -1,264 +0,0 @@ -package batcher - -import ( - "fmt" - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -type batch [][]byte - -type MockSync struct { - flushChan chan struct{} - batches []batch -} - -func NewMockSync() *MockSync { - return &MockSync{ - flushChan: make(chan struct{}, 1), - batches: []batch{}, - } -} - -func (m *MockSync) SendBatch(b [][]byte) { - m.batches = append(m.batches, batch(b)) - m.flushChan <- struct{}{} -} - -func (m *MockSync) waitForFlush(timeout time.Duration) error { - select { - case <-m.flushChan: - return nil - case <-time.After(timeout): - return fmt.Errorf("timed out before flush (waited %s)", timeout.String()) - } -} - -var mockSequence = SequencePair{big.NewInt(99999), 12345} - -func TestBatchingByCount(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2, 1024*1024) - assert.NoError(err) - - t.Log("Batcher respect count limit") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(2, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - assert.Equal("heyhey", string(sync.batches[0][1])) - - t.Log("Batcher doesn't send partial batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingByTime(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Batcher sends partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - - t.Log("Batcher sends all messsages in partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(2, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - assert.Equal("yoyo", string(sync.batches[1][1])) - - t.Log("Batcher doesn't send empty batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingBySize(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 8) - assert.NoError(err) - - t.Log("Large messages are sent immediately") - assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hellohello", string(sync.batches[0][0])) - - t.Log("Batcher tries not to exceed size limit") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(1, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - - t.Log("Batcher sends messages that didn't fit in previous batch") - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(3, len(sync.batches)) - assert.Equal(2, len(sync.batches[2])) - assert.Equal("hihi", string(sync.batches[2][0])) - assert.Equal("yoyo", string(sync.batches[2][1])) - - t.Log("Batcher doesn't send partial batches") - assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestFlushing(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Calling flush sends pending messages") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) - - batcher.Flush() - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) -} - -func TestSendingEmpty(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - t.Log("An error is returned when an empty message is sent") - err = batcher.AddMessage([]byte{}, mockSequence) - assert.Error(err) - assert.Equal(err.Error(), "Empty messages can't be sent") -} - -func TestUpdatingSequence(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - b, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - batcher := b.(*batcher) - - t.Log("Initally, smallestSeq is undefined") - assert.Nil(batcher.SmallestSequencePair().Sequence) - - expected := new(big.Int) - - t.Log("After AddMessage (seq=1), smallestSeq = 1") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(1), 1234}) - expected.SetInt64(1) - seq := batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(2), 1234}) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=1), smallestSeq = 0") - batcher.updateSequenceNumbers(SequencePair{big.NewInt(0), 1234}) - expected.SetInt64(0) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("Flushing batch clears smallest sequence pair") - assert.NoError(batcher.AddMessage([]byte("cdcd"), SequencePair{big.NewInt(2), 1234})) - sync.waitForFlush(time.Minute) - assert.Nil(batcher.SmallestSequencePair().Sequence) -} - -func TestSequencePairIsLessThan(t *testing.T) { - assert := assert.New(t) - - big10 := big.NewInt(10) - big5 := big.NewInt(5) - - tests := []struct { - left SequencePair - right SequencePair - isLess bool - }{ - {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, - {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, - {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, - - {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, - {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, - - {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, - {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, - } - - for _, test := range tests { - left := test.left - right := test.right - t.Logf( - "Is <%s, %d> less than <%s, %d>? %t", - left.Sequence.String(), left.SubSequence, - right.Sequence.String(), right.SubSequence, - test.isLess, - ) - - assert.Equal(test.isLess, left.IsLessThan(right)) - } -} - -func TestSequencePairEmpty(t *testing.T) { - assert := assert.New(t) - - assert.True(SequencePair{nil, 0}.IsEmpty()) - assert.True(SequencePair{nil, 10000}.IsEmpty()) - - assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) - assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) -} diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go new file mode 100644 index 0000000..bb1a3fe --- /dev/null +++ b/batchconsumer/batchermanager.go @@ -0,0 +1,220 @@ +package batchconsumer + +import ( + "os" + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type tagMsgPair struct { + tag string + msg []byte + pair kcl.SequencePair +} + +type batcherManagerConfig struct { + BatchCount int + BatchSize int + BatchInterval time.Duration +} + +type batcherManager struct { + log kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager + + batchCount int + batchSize int + batchInterval time.Duration + + batchMsg chan tagMsgPair + lastIgnored chan kcl.SequencePair + lastProcessed chan kcl.SequencePair + shutdown chan chan<- struct{} +} + +func newBatcherManager( + sender Sender, chkpntManager *checkpointManager, cfg batcherManagerConfig, log kv.KayveeLogger, +) *batcherManager { + bm := &batcherManager{ + log: log, + sender: sender, + chkpntManager: chkpntManager, + + batchCount: cfg.BatchCount, + batchSize: cfg.BatchSize, + batchInterval: cfg.BatchInterval, + + batchMsg: make(chan tagMsgPair), + lastIgnored: make(chan kcl.SequencePair), + lastProcessed: make(chan kcl.SequencePair), + // shutdown chan takes "done" channel to signal when batchermanager is done shutting down + shutdown: make(chan chan<- struct{}), + } + + bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) + + return bm +} + +func (b *batcherManager) BatchMessage(tag string, msg []byte, pair kcl.SequencePair) { + b.batchMsg <- tagMsgPair{tag, msg, pair} +} + +func (b *batcherManager) LatestIgnored(pair kcl.SequencePair) { + b.lastIgnored <- pair +} + +func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) { + b.lastProcessed <- pair +} + +func (b *batcherManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + b.shutdown <- done + + return done +} + +func (b *batcherManager) createBatcher() *batcher { + return &batcher{ + flushCount: b.batchCount, + flushSize: b.batchSize, + } +} + +func (b *batcherManager) sendBatch(batcher *batcher, tag string) { + if len(batcher.Batch) <= 0 { + return + } + + err := b.sender.SendBatch(batcher.Batch, tag) + switch e := err.(type) { + case nil: // Do nothing + case PartialSendBatchError: + b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + for _, line := range e.FailedMessages { + b.log.ErrorD("failed-log", kv.M{"log": line}) + } + stats.Counter("batch-log-failures", len(e.FailedMessages)) + case CatastrophicSendBatchError: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + default: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + } + + batcher.Clear() + stats.Counter("batches-sent", 1) +} + +func (b *batcherManager) sendCheckpoint( + tag string, lastIgnoredPair kcl.SequencePair, batchers map[string]*batcher, +) { + smallest := lastIgnoredPair + + for name, batcher := range batchers { + if tag == name { + continue + } + + if len(batcher.Batch) <= 0 { + continue + } + + // Check for empty because it's possible that no messages have been ignored + if smallest.IsNil() || batcher.SmallestSeq.IsLessThan(smallest) { + smallest = batcher.SmallestSeq + } + } + + if !smallest.IsNil() { + b.chkpntManager.Checkpoint(smallest) + } +} + +// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a +// go routine to avoid racey conditions. +func (b *batcherManager) startMessageHandler( + batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, + shutdown <-chan chan<- struct{}, +) { + flushStaleBatches := make(chan struct{}) + + go func() { + for { // Flush batches that haven't been updated recently + <-time.NewTimer(time.Second).C + flushStaleBatches <- struct{}{} + } + }() + + go func() { + var lastProcessedPair kcl.SequencePair + var lastIgnoredPair kcl.SequencePair + batchers := map[string]*batcher{} + + for { + select { + case <-flushStaleBatches: + for tag, batcher := range batchers { + if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { + b.sendBatch(batcher, tag) + b.sendCheckpoint(tag, lastIgnoredPair, batchers) + batcher.Clear() + } + } + case tmp := <-batchMsg: + batcher, ok := batchers[tmp.tag] + if !ok { + batcher = b.createBatcher() + batchers[tmp.tag] = batcher + stats.Gauge("tag-count", len(batchers)) + } + + err := batcher.AddMessage(tmp.msg, tmp.pair) + if err == ErrBatchFull { + b.sendBatch(batcher, tmp.tag) + b.sendCheckpoint(tmp.tag, lastIgnoredPair, batchers) + + batcher.AddMessage(tmp.msg, tmp.pair) + } else if err != nil { + b.log.ErrorD("add-message", kv.M{ + "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + }) + } + stats.Counter("msg-batched", 1) + case pair := <-lastIgnored: + lastIgnoredPair = pair + + isPendingMessages := false + for _, batcher := range batchers { + if len(batcher.Batch) > 0 { + isPendingMessages = true + break + } + } + + if !isPendingMessages { + b.chkpntManager.Checkpoint(lastIgnoredPair) + } + case pair := <-lastProcessed: + lastProcessedPair = pair + case done := <-shutdown: + for tag, batcher := range batchers { + b.sendBatch(batcher, tag) + } + b.chkpntManager.Checkpoint(lastProcessedPair) + chkDone := b.chkpntManager.Shutdown() + <-chkDone + + done <- struct{}{} + return + } + } + }() +} diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go new file mode 100644 index 0000000..6f312e0 --- /dev/null +++ b/batchconsumer/checkpointmanager.go @@ -0,0 +1,89 @@ +package batchconsumer + +import ( + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type checkpointManager struct { + log kv.KayveeLogger + + checkpointFreq time.Duration + + checkpoint chan kcl.SequencePair + // shutdown chan takes "done" channel to signal when checkpointManager is done shutting down + shutdown chan chan<- struct{} +} + +func newCheckpointManager( + checkpointer kcl.Checkpointer, checkpointFreq time.Duration, log kv.KayveeLogger, +) *checkpointManager { + cm := &checkpointManager{ + log: log, + + checkpointFreq: checkpointFreq, + + checkpoint: make(chan kcl.SequencePair), + shutdown: make(chan chan<- struct{}), + } + + cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown) + + return cm +} + +func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) { + cm.checkpoint <- pair +} + +func (cm *checkpointManager) Shutdown() <-chan struct{} { + done := make(chan struct{}) + cm.shutdown <- done + + return done +} + +func (cm *checkpointManager) startCheckpointHandler( + checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, + shutdown <-chan chan<- struct{}, +) { + go func() { + lastCheckpoint := time.Now() + + for { + var doneShutdown chan<- struct{} + pair := kcl.SequencePair{} + + select { + case pair = <-checkpoint: + case doneShutdown = <-shutdown: + } + + // This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq. + // The latest pair number is always used. + for doneShutdown == nil && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { + select { + case pair = <-checkpoint: // Keep updating checkpoint pair while waiting + case doneShutdown = <-shutdown: + case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C: + } + } + + if !pair.IsNil() { + checkpointer.Checkpoint(pair) + lastCheckpoint = time.Now() + stats.Counter("checkpoints-sent", 1) + } + + if doneShutdown != nil { + checkpointer.Shutdown() + doneShutdown <- struct{}{} + return + } + } + }() +} diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index f43ef05..9b16e0b 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -18,6 +18,7 @@ type Config struct { // LogFile where consumer errors and failed log lines are saved LogFile string + // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration // BatchCount is the number of messages that triggers a SendBatch call @@ -32,10 +33,6 @@ type Config struct { // CheckpointFreq the frequency in which a checkpoint is saved CheckpointFreq time.Duration - // CheckpointRetries the number of times the consumer will try to save a checkpoint - CheckpointRetries int - // CheckpointRetrySleep the amount of time between checkpoint save attempts - CheckpointRetrySleep time.Duration } // BatchConsumer is responsible for marshalling @@ -63,8 +60,9 @@ func withDefaults(config Config) Config { config.DeployEnv = "unknown-env" } + // Not totally clear we need this rate limit. The KCL may do rate limiting for us. if config.ReadRateLimit == 0 { - config.ReadRateLimit = 300 + config.ReadRateLimit = 1000 } if config.ReadBurstLimit == 0 { config.ReadBurstLimit = int(float64(config.ReadRateLimit)*1.2 + 0.5) @@ -73,12 +71,6 @@ func withDefaults(config Config) Config { if config.CheckpointFreq == 0 { config.CheckpointFreq = 60 * time.Second } - if config.CheckpointRetries == 0 { - config.CheckpointRetries = 5 - } - if config.CheckpointRetrySleep == 0 { - config.CheckpointRetrySleep = 5 * time.Second - } return config } diff --git a/batchconsumer/stats/stats.go b/batchconsumer/stats/stats.go new file mode 100644 index 0000000..75eaaf9 --- /dev/null +++ b/batchconsumer/stats/stats.go @@ -0,0 +1,50 @@ +package stats + +import ( + "time" + + "gopkg.in/Clever/kayvee-go.v6/logger" +) + +var log = logger.New("amazon-kinesis-client-go") + +type datum struct { + name string + value int + category string +} + +var queue = make(chan datum, 1000) + +func init() { + data := map[string]int{} + tick := time.Tick(time.Minute) + go func() { + for { + select { + case d := <-queue: + if d.category == "counter" { + data[d.name] = data[d.name] + d.value + } else if d.category == "gauge" { + data[d.name] = d.value + } else { + log.ErrorD("unknow-stat-category", logger.M{"category": d.category}) + } + case <-tick: + tmp := logger.M{} + for k, v := range data { + tmp[k] = v + } + log.InfoD("stats", tmp) + } + } + }() +} + +func Counter(name string, val int) { + queue <- datum{name, val, "counter"} +} + +func Gauge(name string, val int) { + queue <- datum{name, val, "gauge"} +} diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go deleted file mode 100644 index f50a703..0000000 --- a/batchconsumer/sync.go +++ /dev/null @@ -1,10 +0,0 @@ -package batchconsumer - -type batcherSync struct { - tag string - writer *batchedWriter -} - -func (b *batcherSync) SendBatch(batch [][]byte) { - b.writer.SendBatch(batch, b.tag) -} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index f641f62..af102d1 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -5,23 +5,15 @@ import ( "encoding/base64" "fmt" "math/big" - "os" - "time" "golang.org/x/time/rate" kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher" + "github.com/Clever/amazon-kinesis-client-go/batchconsumer/stats" "github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/splitter" ) -type tagMsgPair struct { - tag string - msg []byte - pair batcher.SequencePair -} - type batchedWriter struct { config Config sender Sender @@ -29,16 +21,13 @@ type batchedWriter struct { shardID string - checkpointMsg chan batcher.SequencePair - checkpointTag chan string - lastProcessedPair chan batcher.SequencePair - batchMsg chan tagMsgPair - flushBatches chan struct{} + chkpntManager *checkpointManager + batcherManager *batcherManager // Limits the number of records read from the stream rateLimiter *rate.Limiter - lastProcessedSeq batcher.SequencePair + lastProcessedSeq kcl.SequencePair } func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batchedWriter { @@ -53,170 +42,19 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.checkpointMsg = make(chan batcher.SequencePair) - b.startCheckpointListener(checkpointer, b.checkpointMsg) - b.checkpointTag = make(chan string) - b.batchMsg = make(chan tagMsgPair) - b.flushBatches = make(chan struct{}) - b.lastProcessedPair = make(chan batcher.SequencePair) - b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastProcessedPair, b.flushBatches) + bmConfig := batcherManagerConfig{ + BatchCount: b.config.BatchCount, + BatchSize: b.config.BatchSize, + BatchInterval: b.config.BatchInterval, + } + + b.chkpntManager = newCheckpointManager(checkpointer, b.config.CheckpointFreq, b.log) + b.batcherManager = newBatcherManager(b.sender, b.chkpntManager, bmConfig, b.log) return nil } -// handleCheckpointError returns true if checkout should be tried again. Returns false otherwise. -func (b *batchedWriter) handleCheckpointError(err error) bool { - if err == nil { - return false - } - - cperr, ok := err.(kcl.CheckpointError) - if !ok { - b.log.ErrorD("unknown-checkpoint-error", kv.M{"msg": err.Error(), "shard-id": b.shardID}) - return true - } - - switch cperr.Error() { - case "ShutdownException": // Skips checkpointing - b.log.ErrorD("shutdown-checkpoint-exception", kv.M{ - "msg": err.Error(), "shard-id": b.shardID, - }) - return false - case "ThrottlingException": - b.log.ErrorD("checkpoint-throttle", kv.M{"shard-id": b.shardID}) - case "InvalidStateException": - b.log.ErrorD("invalid-checkpoint-state", kv.M{"shard-id": b.shardID}) - default: - b.log.ErrorD("checkpoint-error", kv.M{"shard-id": b.shardID, "msg": err}) - } - - return true -} - -func (b *batchedWriter) startCheckpointListener( - checkpointer kcl.Checkpointer, checkpointMsg <-chan batcher.SequencePair, -) { - go func() { - lastCheckpoint := time.Now() - - for { - seq := <-checkpointMsg - - // This is a write throttle to ensure we don't checkpoint faster than - // b.config.CheckpointFreq. The latest seq number is always used. - for time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { - select { - case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting - case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: - } - } - - retry := true - for n := 0; retry && n < b.config.CheckpointRetries+1; n++ { - str := seq.Sequence.String() - err := checkpointer.Checkpoint(&str, &seq.SubSequence) - if err == nil { // Successfully checkpointed! - lastCheckpoint = time.Now() - break - } - - retry = b.handleCheckpointError(err) - - if n == b.config.CheckpointRetries { - b.log.ErrorD("checkpoint-retries", kv.M{"attempts": b.config.CheckpointRetries}) - retry = false - } - - if retry { - time.Sleep(b.config.CheckpointRetrySleep) - } - } - } - }() -} - -func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { - sync := &batcherSync{ - tag: tag, - writer: b, - } - batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) - if err != nil { - b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag}) - } - - return batch -} - -// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a -// go routine to avoid racey conditions. -func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastPair <-chan batcher.SequencePair, - flushBatches <-chan struct{}, -) { - go func() { - var lastProcessedPair batcher.SequencePair - batchers := map[string]batcher.Batcher{} - areBatchersEmpty := true - - for { - select { - case tmp := <-batchMsg: - batcher, ok := batchers[tmp.tag] - if !ok { - batcher = b.createBatcher(tmp.tag) - batchers[tmp.tag] = batcher - } - - err := batcher.AddMessage(tmp.msg, tmp.pair) - if err != nil { - b.log.ErrorD("add-message", kv.M{ - "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, - }) - } - areBatchersEmpty = false - case tag := <-checkpointTag: - smallest := lastProcessedPair - isAllEmpty := true - - for name, batch := range batchers { - if tag == name { - continue - } - - pair := batch.SmallestSequencePair() - if pair.IsEmpty() { // Occurs when batch has no items - continue - } - - if pair.IsLessThan(smallest) { - smallest = pair - } - - isAllEmpty = false - } - - if !smallest.IsEmpty() { - b.checkpointMsg <- smallest - } - areBatchersEmpty = isAllEmpty - case pair := <-lastPair: - if areBatchersEmpty { - b.checkpointMsg <- pair - } - lastProcessedPair = pair - case <-flushBatches: - for _, batch := range batchers { - batch.Flush() - } - b.checkpointMsg <- lastProcessedPair - areBatchersEmpty = true - } - } - }() -} - func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription @@ -231,7 +69,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { - var pair batcher.SequencePair + var pair kcl.SequencePair prevPair := b.lastProcessedSeq for _, record := range records { @@ -243,8 +81,8 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) } - pair = batcher.SequencePair{seq, record.SubSequenceNumber} - if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty + pair = kcl.SequencePair{seq, record.SubSequenceNumber} + if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } @@ -257,23 +95,27 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { if err != nil { return err } + wasPairIgnored := true for _, rawmsg := range messages { msg, tags, err := b.sender.ProcessMessage(rawmsg) if err == ErrMessageIgnored { continue // Skip message } else if err != nil { + stats.Counter("unknown-error", 1) b.log.ErrorD("process-message", kv.M{"msg": err.Error(), "rawmsg": string(rawmsg)}) continue // Don't stop processing messages because of one bad message } if len(tags) == 0 { + stats.Counter("no-tags", 1) b.log.ErrorD("no-tags", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("No tags provided by consumer for log: %s", string(rawmsg)) } for _, tag := range tags { if tag == "" { + stats.Counter("blank-tag", 1) b.log.ErrorD("blank-tag", kv.M{"rawmsg": string(rawmsg)}) return fmt.Errorf("Blank tag provided by consumer for log: %s", string(rawmsg)) } @@ -282,44 +124,33 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { // it's been sent. When batches are sent, conceptually we first find the smallest // sequence number amount all the batch (let's call it A). We then checkpoint at // the A-1 sequence number. - b.batchMsg <- tagMsgPair{tag, msg, prevPair} + b.batcherManager.BatchMessage(tag, msg, prevPair) + wasPairIgnored = false } } prevPair = pair - b.lastProcessedPair <- pair + if wasPairIgnored { + b.batcherManager.LatestIgnored(pair) + } + b.batcherManager.LatestProcessed(pair) + + stats.Counter("processed-messages", len(messages)) } b.lastProcessedSeq = pair return nil } -func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { - err := b.sender.SendBatch(batch, tag) - switch e := err.(type) { - case nil: // Do nothing - case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) - for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) - } - case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - } - - b.checkpointTag <- tag -} - func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) - b.flushBatches <- struct{}{} } else { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } + + done := b.batcherManager.Shutdown() + <-done + return nil } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index 6622937..d62c467 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -86,30 +86,28 @@ type mockCheckpointer struct { shutdown chan struct{} } -func NewMockCheckpointer(maxSeq string, timeout time.Duration) *mockCheckpointer { +func NewMockCheckpointer(timeout time.Duration) *mockCheckpointer { mcp := &mockCheckpointer{ checkpoint: make(chan string), done: make(chan struct{}, 1), timeout: make(chan struct{}, 1), shutdown: make(chan struct{}), } - mcp.startWaiter(maxSeq, timeout) + mcp.startWaiter(timeout) return mcp } -func (m *mockCheckpointer) startWaiter(maxSeq string, timeout time.Duration) { +func (m *mockCheckpointer) startWaiter(timeout time.Duration) { go func() { for { select { case seq := <-m.checkpoint: m.recievedSequences = append(m.recievedSequences, seq) - if seq == maxSeq { - m.done <- struct{}{} - } case <-time.NewTimer(timeout).C: m.timeout <- struct{}{} case <-m.shutdown: + m.done <- struct{}{} return } } @@ -126,14 +124,8 @@ func (m *mockCheckpointer) wait() error { func (m *mockCheckpointer) Shutdown() { m.shutdown <- struct{}{} } -func (m *mockCheckpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { - m.checkpoint <- *sequenceNumber - return nil -} -func (m *mockCheckpointer) CheckpointWithRetry( - sequenceNumber *string, subSequenceNumber *int, retryCount int, -) error { - return m.Checkpoint(sequenceNumber, subSequenceNumber) +func (m *mockCheckpointer) Checkpoint(pair kcl.SequencePair) { + m.checkpoint <- pair.Sequence.String() } func encode(str string) string { @@ -148,7 +140,7 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { BatchInterval: 10 * time.Millisecond, CheckpointFreq: 20 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("4", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) wrt := NewBatchedWriter(mockconfig, ignoringSender{}, mocklog) wrt.Initialize("test-shard", mockcheckpointer) @@ -161,8 +153,13 @@ func TestProcessRecordsIgnoredMessages(t *testing.T) { }) assert.NoError(err) + err = wrt.Shutdown("TERMINATE") + assert.NoError(err) + err = mockcheckpointer.wait() assert.NoError(err) + + assert.Contains(mockcheckpointer.recievedSequences, "4") } func TestProcessRecordsMutliBatchBasic(t *testing.T) { @@ -173,7 +170,7 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("8", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -194,8 +191,6 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -233,7 +228,7 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Millisecond, }) - mockcheckpointer := NewMockCheckpointer("26", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -272,8 +267,6 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -312,7 +305,7 @@ func TestStaggeredCheckpionting(t *testing.T) { BatchInterval: 100 * time.Millisecond, CheckpointFreq: 200 * time.Nanosecond, }) - mockcheckpointer := NewMockCheckpointer("9", 5*time.Second) + mockcheckpointer := NewMockCheckpointer(5 * time.Second) mocksender := NewMsgAsTagSender() wrt := NewBatchedWriter(mockconfig, mocksender, mocklog) @@ -343,7 +336,6 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.NoError(err) mocksender.Shutdown() - mockcheckpointer.Shutdown() // Test to make sure writer doesn't prematurely checkpoint messages // Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch @@ -352,6 +344,7 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.NotContains(mockcheckpointer.recievedSequences, "6") assert.NotContains(mockcheckpointer.recievedSequences, "7") assert.NotContains(mockcheckpointer.recievedSequences, "8") + assert.Contains(mockcheckpointer.recievedSequences, "9") assert.Contains(mocksender.batches, "tag1") assert.Equal(2, len(mocksender.batches["tag1"])) // One batch @@ -365,8 +358,10 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.Equal(2, len(mocksender.batches["tag3"][0])) // with three items assert.Equal("tag3", string(mocksender.batches["tag3"][0][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][0][1])) + assert.Equal(2, len(mocksender.batches["tag3"][1])) assert.Equal("tag3", string(mocksender.batches["tag3"][1][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][1][1])) + assert.Equal(2, len(mocksender.batches["tag3"][2])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][0])) assert.Equal("tag3", string(mocksender.batches["tag3"][2][1])) } diff --git a/circle.yml b/circle.yml index 440a5a7..52271b9 100644 --- a/circle.yml +++ b/circle.yml @@ -11,6 +11,7 @@ compile: override: - make install_deps - make build + - make bench test: override: - make test diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 02281a9..6f2f51a 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -10,18 +10,15 @@ import ( ) type sampleRecordProcessor struct { - checkpointer kcl.Checkpointer - checkpointRetries int - checkpointFreq time.Duration - largestSeq *big.Int - largestSubSeq int - lastCheckpoint time.Time + checkpointer kcl.Checkpointer + checkpointFreq time.Duration + largestPair kcl.SequencePair + lastCheckpoint time.Time } func newSampleRecordProcessor() *sampleRecordProcessor { return &sampleRecordProcessor{ - checkpointRetries: 5, - checkpointFreq: 60 * time.Second, + checkpointFreq: 60 * time.Second, } } @@ -31,9 +28,8 @@ func (srp *sampleRecordProcessor) Initialize(shardID string, checkpointer kcl.Ch return nil } -func (srp *sampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { - return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || - (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) +func (srp *sampleRecordProcessor) shouldUpdateSequence(pair kcl.SequencePair) bool { + return srp.largestPair.IsLessThan(pair) } func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { @@ -43,14 +39,13 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber) continue } - if srp.shouldUpdateSequence(seqNumber, record.SubSequenceNumber) { - srp.largestSeq = seqNumber - srp.largestSubSeq = record.SubSequenceNumber + pair := kcl.SequencePair{seqNumber, record.SubSequenceNumber} + if srp.shouldUpdateSequence(pair) { + srp.largestPair = pair } } if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { - largestSeq := srp.largestSeq.String() - srp.checkpointer.CheckpointWithRetry(&largestSeq, &srp.largestSubSeq, srp.checkpointRetries) + srp.checkpointer.Checkpoint(srp.largestPair) srp.lastCheckpoint = time.Now() } return nil diff --git a/decode/decode.go b/decode/decode.go index cfacd9d..47f08cc 100644 --- a/decode/decode.go +++ b/decode/decode.go @@ -83,8 +83,14 @@ func FieldsFromKayvee(line string) (map[string]interface{}, error) { possibleJSON := line[firstIdx : lastIdx+1] var fields map[string]interface{} if err := json.Unmarshal([]byte(possibleJSON), &fields); err != nil { - return map[string]interface{}{}, err + return map[string]interface{}{}, &NonKayveeError{} } + + if len(fields) == 0 { // Some logs superfluous "{}" in them. They're not kayvee. + return map[string]interface{}{}, &NonKayveeError{} + } + // TODO: consider also filter if they have source and title + for k, v := range fields { if !stringInSlice(k, reservedFields) { m[k] = v @@ -319,9 +325,14 @@ func ExtractKVMeta(kvlog map[string]interface{}) KVMeta { tmp, ok = kvmeta["routes"] if ok { - routes, ok := tmp.([]map[string]interface{}) + routes, ok := tmp.([]interface{}) if ok { - kvRoutes = routes + for _, route := range routes { + rule, ok := route.(map[string]interface{}) + if ok { // TODO: log error + kvRoutes = append(kvRoutes, rule) + } + } } } diff --git a/decode/decode_test.go b/decode/decode_test.go index 9f7a123..22877a5 100644 --- a/decode/decode_test.go +++ b/decode/decode_test.go @@ -1,7 +1,6 @@ package decode import ( - "encoding/json" "fmt" "testing" "time" @@ -75,7 +74,13 @@ func TestKayveeDecoding(t *testing.T) { Title: "errors on invalid JSON (missing a quote)", Input: `prefix {"a:"b"} postfix`, ExpectedOutput: map[string]interface{}{}, - ExpectedError: &json.SyntaxError{}, + ExpectedError: &NonKayveeError{}, + }, + Spec{ + Title: "errors on empty JSON: {}", + Input: `prefix {} postfix`, + ExpectedOutput: map[string]interface{}{}, + ExpectedError: &NonKayveeError{}, }, } @@ -551,7 +556,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "green", "kv_version": "three", "kv_language": "tree", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "metrics", "rule": "cool", @@ -590,7 +595,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "green", "kv_version": "christmas", "kv_language": "tree", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "analytics", "rule": "what's-this?", @@ -632,7 +637,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "slack", "kv_version": "evergreen", "kv_language": "markdown-ish", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "notifications", "rule": "did-you-know", @@ -678,7 +683,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "a-team", "kv_version": "old", "kv_language": "jive", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "alerts", "rule": "last-call", @@ -740,7 +745,7 @@ func TestExtractKVMeta(t *testing.T) { "team": "diversity", "kv_version": "kv-routes", "kv_language": "understanding", - "routes": []map[string]interface{}{ + "routes": []interface{}{ map[string]interface{}{ "type": "metrics", "rule": "all-combos", diff --git a/kcl/kcl.go b/kcl/kcl.go index 950a92c..3270be1 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -2,7 +2,6 @@ package kcl import ( "bufio" - "bytes" "encoding/json" "fmt" "io" @@ -14,118 +13,21 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error + // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } type Checkpointer interface { - Checkpoint(sequenceNumber *string, subSequenceNumber *int) error - CheckpointWithRetry(sequenceNumber *string, subSequenceNumber *int, retryCount int) error + Checkpoint(pair SequencePair) Shutdown() } -type CheckpointError struct { - e string -} - -func (ce CheckpointError) Error() string { - return ce.e -} - -type checkpointer struct { - mux sync.Mutex - - ioHandler ioHandler -} - -func (c *checkpointer) getAction() (interface{}, error) { - line, err := c.ioHandler.readLine() - if err != nil { - return nil, err - } - action, err := c.ioHandler.loadAction(line.String()) - if err != nil { - return nil, err - } - return action, nil -} - -func (c *checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { - c.mux.Lock() - defer c.mux.Unlock() - - c.ioHandler.writeAction(ActionCheckpoint{ - Action: "checkpoint", - SequenceNumber: sequenceNumber, - SubSequenceNumber: subSequenceNumber, - }) - line, err := c.ioHandler.readLine() - if err != nil { - return err - } - actionI, err := c.ioHandler.loadAction(line.String()) - if err != nil { - return err - } - action, ok := actionI.(ActionCheckpoint) - if !ok { - return fmt.Errorf("expected checkpoint response, got '%s'", line.String()) - } - if action.Error != nil && *action.Error != "" { - return CheckpointError{ - e: *action.Error, - } - } - return nil -} - -// CheckpointWithRetry tries to save a checkPoint up to `retryCount` + 1 times. -// `retryCount` should be >= 0 -func (c *checkpointer) CheckpointWithRetry( - sequenceNumber *string, subSequenceNumber *int, retryCount int, -) error { - sleepDuration := 5 * time.Second - - for n := 0; n <= retryCount; n++ { - err := c.Checkpoint(sequenceNumber, subSequenceNumber) - if err == nil { - return nil - } - - if cperr, ok := err.(CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s\n", sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } - - if n == retryCount { - return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) - } - - time.Sleep(sleepDuration) - } - - return nil -} - -func (c *checkpointer) Shutdown() { - c.CheckpointWithRetry(nil, nil, 5) -} - type ioHandler struct { inputFile io.Reader outputFile io.Writer errorFile io.Writer } -//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.) - func (i ioHandler) writeLine(line string) { fmt.Fprintf(i.outputFile, "\n%s\n", line) } @@ -134,13 +36,13 @@ func (i ioHandler) writeError(message string) { fmt.Fprintf(i.errorFile, "%s\n", message) } -func (i ioHandler) readLine() (*bytes.Buffer, error) { +func (i ioHandler) readLine() (string, error) { bio := bufio.NewReader(i.inputFile) line, err := bio.ReadString('\n') if err != nil { - return nil, err + return "", err } - return bytes.NewBufferString(line), nil + return line, nil } type ActionInitialize struct { @@ -197,6 +99,8 @@ func (i ioHandler) loadAction(line string) (interface{}, error) { return nil, err } return actionProcessRecords, nil + case "shutdownRequested": + fallthrough case "shutdown": var actionShutdown ActionShutdown if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil { @@ -223,25 +127,88 @@ func (i ioHandler) writeAction(action interface{}) error { return nil } -func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor) *KCLProcess { +func New( + inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor, +) *KCLProcess { i := ioHandler{ inputFile: inputFile, outputFile: outputFile, errorFile: errorFile, } return &KCLProcess{ - ioHandler: i, - checkpointer: &checkpointer{ - ioHandler: i, - }, + ioHandler: i, recordProcessor: recordProcessor, + + nextCheckpointPair: SequencePair{}, } } type KCLProcess struct { + ckpmux sync.Mutex + ioHandler ioHandler - checkpointer Checkpointer recordProcessor RecordProcessor + + nextCheckpointPair SequencePair +} + +func (kclp *KCLProcess) Checkpoint(pair SequencePair) { + kclp.ckpmux.Lock() + defer kclp.ckpmux.Unlock() + + if kclp.nextCheckpointPair.IsNil() || kclp.nextCheckpointPair.IsLessThan(pair) { + kclp.nextCheckpointPair = pair + } +} + +func (kclp *KCLProcess) Shutdown() { + kclp.ioHandler.writeError("Checkpoint shutdown") + kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown +} + +func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { + if action.Error == nil { // Successful checkpoint + return nil + } + + msg := *action.Error + switch msg { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + sleep := 5 * time.Second + fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep) + time.Sleep(sleep) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg) + } + + seq := action.SequenceNumber + subSeq := action.SubSequenceNumber + + kclp.ckpmux.Lock() + if !kclp.nextCheckpointPair.IsNil() { + tmp := kclp.nextCheckpointPair.Sequence.String() + seq = &tmp + subSeq = &kclp.nextCheckpointPair.SubSequence + } + kclp.ckpmux.Unlock() + + if seq != nil && subSeq != nil { + return kclp.sendCheckpoint(seq, subSeq) + } + + return nil +} + +func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error { + return kclp.ioHandler.writeAction(ActionCheckpoint{ + Action: "checkpoint", + SequenceNumber: seq, + SubSequenceNumber: subSeq, + }) } func (kclp *KCLProcess) reportDone(responseFor string) error { @@ -254,47 +221,75 @@ func (kclp *KCLProcess) reportDone(responseFor string) error { }) } -func (kclp *KCLProcess) performAction(a interface{}) (string, error) { - switch action := a.(type) { - case ActionInitialize: - return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp.checkpointer) - case ActionProcessRecords: - return action.Action, kclp.recordProcessor.ProcessRecords(action.Records) - case ActionShutdown: - return action.Action, kclp.recordProcessor.Shutdown(action.Reason) - default: - return "", fmt.Errorf("unknown action to dispatch: %s", action) - } -} - func (kclp *KCLProcess) handleLine(line string) error { action, err := kclp.ioHandler.loadAction(line) if err != nil { return err } - responseFor, err := kclp.performAction(action) - if err != nil { - return err + switch action := action.(type) { + case ActionCheckpoint: + err = kclp.handleCheckpointAction(action) + case ActionShutdown: + kclp.ioHandler.writeError("Received shutdown action...") + + // Shutdown should block until it's safe to shutdown the process + err = kclp.recordProcessor.Shutdown(action.Reason) + if err != nil { // Log error and continue shutting down + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) + } + + kclp.ioHandler.writeError("Reporting shutdown done") + return kclp.reportDone("shutdown") + case ActionInitialize: + err = kclp.recordProcessor.Initialize(action.ShardID, kclp) + if err == nil { + err = kclp.reportDone(action.Action) + } + case ActionProcessRecords: + err = kclp.recordProcessor.ProcessRecords(action.Records) + if err == nil { + err = kclp.reportDone(action.Action) + } + default: + err = fmt.Errorf("unknown action to dispatch: %+#v", action) } - return kclp.reportDone(responseFor) + + return err } func (kclp *KCLProcess) Run() { for { line, err := kclp.ioHandler.readLine() - if err != nil { - kclp.ioHandler.writeError("Read line error: " + err.Error()) + if err == io.EOF { + kclp.ioHandler.writeError("IO stream closed") return - } else if line == nil { + } else if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err)) + return + } else if line == "" { kclp.ioHandler.writeError("Empty read line recieved") + continue + } + + err = kclp.handleLine(line) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } - err = kclp.handleLine(line.String()) - if err != nil { - kclp.ioHandler.writeError("Handle line error: " + err.Error()) - return + kclp.ckpmux.Lock() + if !kclp.nextCheckpointPair.IsNil() { + seq := kclp.nextCheckpointPair.Sequence.String() + subSeq := kclp.nextCheckpointPair.SubSequence + + err := kclp.sendCheckpoint(&seq, &subSeq) + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) + } else { + kclp.nextCheckpointPair = SequencePair{} + } } + kclp.ckpmux.Unlock() } } diff --git a/kcl/sequencepair.go b/kcl/sequencepair.go new file mode 100644 index 0000000..685d099 --- /dev/null +++ b/kcl/sequencepair.go @@ -0,0 +1,31 @@ +package kcl + +import ( + "math/big" +) + +// SequencePair a convience way to pass around a Sequence / SubSequence pair +type SequencePair struct { + Sequence *big.Int + SubSequence int +} + +func (s SequencePair) IsNil() bool { + return s.Sequence == nil +} + +func (s SequencePair) IsLessThan(pair SequencePair) bool { + if s.IsNil() || pair.IsNil() { // empty pairs are incomparable + return false + } + + cmp := s.Sequence.Cmp(pair.Sequence) + if cmp == -1 { + return true + } + if cmp == 1 { + return false + } + + return s.SubSequence < pair.SubSequence +} diff --git a/kcl/sequencepair_test.go b/kcl/sequencepair_test.go new file mode 100644 index 0000000..51e0744 --- /dev/null +++ b/kcl/sequencepair_test.go @@ -0,0 +1,54 @@ +package kcl + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSequencePairIsLessThan(t *testing.T) { + assert := assert.New(t) + + big10 := big.NewInt(10) + big5 := big.NewInt(5) + + tests := []struct { + left SequencePair + right SequencePair + isLess bool + }{ + {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, + {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, + {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, + + {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, + {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, + + {left: SequencePair{big10, 0}, right: SequencePair{big5, 0}, isLess: false}, + {left: SequencePair{big5, 10}, right: SequencePair{big5, 0}, isLess: false}, + } + + for _, test := range tests { + left := test.left + right := test.right + t.Logf( + "Is <%s, %d> less than <%s, %d>? %t", + left.Sequence.String(), left.SubSequence, + right.Sequence.String(), right.SubSequence, + test.isLess, + ) + + assert.Equal(test.isLess, left.IsLessThan(right)) + } +} + +func TestSequencePairEmpty(t *testing.T) { + assert := assert.New(t) + + assert.True(SequencePair{nil, 0}.IsNil()) + assert.True(SequencePair{nil, 10000}.IsNil()) + + assert.False(SequencePair{big.NewInt(10), 0}.IsNil()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsNil()) +}