From 6a30e0eb8ac93e181dc3961426be4a9d9da733b6 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 4 Aug 2017 09:36:42 +0000 Subject: [PATCH] Huge refactor. Batchers no longer run in their own go-routines, which allowed us to a lot of concurrency and simplify model. --- batchconsumer/batcher.go | 53 +++++ batchconsumer/batcher/message_batcher.go | 169 -------------- batchconsumer/batcher/message_batcher_test.go | 220 ------------------ batchconsumer/batchermanager.go | 196 ++++++++++++++++ batchconsumer/checkpointmanager.go | 89 +++++++ batchconsumer/consumer.go | 1 + batchconsumer/sync.go | 10 - batchconsumer/writer.go | 201 +--------------- batchconsumer/writer_test.go | 5 - 9 files changed, 349 insertions(+), 595 deletions(-) create mode 100644 batchconsumer/batcher.go delete mode 100644 batchconsumer/batcher/message_batcher.go delete mode 100644 batchconsumer/batcher/message_batcher_test.go create mode 100644 batchconsumer/batchermanager.go create mode 100644 batchconsumer/checkpointmanager.go delete mode 100644 batchconsumer/sync.go diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go new file mode 100644 index 0000000..83d3a47 --- /dev/null +++ b/batchconsumer/batcher.go @@ -0,0 +1,53 @@ +package batchconsumer + +import ( + "fmt" + "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +var ErrBatchFull = fmt.Errorf("The batch is full") + +type batcher struct { + flushCount int + flushSize int + + Batch [][]byte + LastUpdated time.Time + SmallestSeq kcl.SequencePair +} + +func (b *batcher) batchSize(batch [][]byte) int { + total := 0 + for _, msg := range batch { + total += len(msg) + } + + return total +} + +func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { + if b.flushCount <= len(b.Batch) { + return ErrBatchFull + } + + size := b.batchSize(b.Batch) + if b.flushSize < size+len(msg) { + return ErrBatchFull + } + + b.Batch = append(b.Batch, msg) + if b.SmallestSeq.IsEmpty() || pair.IsLessThan(b.SmallestSeq) { + b.SmallestSeq = pair + } + b.LastUpdated = time.Now() + + return nil +} + +func (b *batcher) Clear() { + b.Batch = [][]byte{} + b.LastUpdated = time.Time{} + b.SmallestSeq = kcl.SequencePair{} +} diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go deleted file mode 100644 index 0257fde..0000000 --- a/batchconsumer/batcher/message_batcher.go +++ /dev/null @@ -1,169 +0,0 @@ -package batcher - -import ( - "fmt" - "sync" - "time" - - "github.com/Clever/amazon-kinesis-client-go/kcl" -) - -// Sync is used to allow a writer to syncronize with the batcher. -// The writer declares how to write messages (via its `SendBatch` method), while the batcher -// keeps track of messages written -type Sync interface { - SendBatch(batch [][]byte) -} - -// Batcher interface -type Batcher interface { - // AddMesage to the batch - AddMessage(msg []byte, sequencePair kcl.SequencePair) error - // Flush all messages from the batch - Flush() - // SmallestSeqPair returns the smallest SequenceNumber and SubSequence number in - // the current batch - SmallestSequencePair() kcl.SequencePair -} - -type msgPack struct { - msg []byte - sequencePair kcl.SequencePair -} - -type batcher struct { - mux sync.Mutex - - flushInterval time.Duration - flushCount int - flushSize int - - // smallestSeq are used for checkpointing - smallestSeq kcl.SequencePair - - sync Sync - msgChan chan<- msgPack - flushChan chan<- struct{} -} - -// New creates a new Batcher -// - sync - synchronizes batcher with writer -// - flushInterval - how often accumulated messages should be flushed (default 1 second). -// - flushCount - number of messages that trigger a flush (default 10). -// - flushSize - size of batch that triggers a flush (default 1024 * 1024 = 1 mb) -func New(sync Sync, flushInterval time.Duration, flushCount int, flushSize int) (Batcher, error) { - if flushSize == 0 { - return nil, fmt.Errorf("flush size must be non-zero") - } - if flushCount == 0 { - return nil, fmt.Errorf("flush count must be non-zero") - } - if flushInterval == 0 { - return nil, fmt.Errorf("flush interval must be non-zero") - } - - msgChan := make(chan msgPack) - flushChan := make(chan struct{}) - - b := &batcher{ - flushCount: flushCount, - flushInterval: flushInterval, - flushSize: flushSize, - sync: sync, - msgChan: msgChan, - flushChan: flushChan, - } - - go b.startBatcher(msgChan, flushChan) - - return b, nil -} - -func (b *batcher) SmallestSequencePair() kcl.SequencePair { - b.mux.Lock() - defer b.mux.Unlock() - - return b.smallestSeq -} - -func (b *batcher) SetFlushInterval(dur time.Duration) { - b.flushInterval = dur -} - -func (b *batcher) SetFlushCount(count int) { - b.flushCount = count -} - -func (b *batcher) SetFlushSize(size int) { - b.flushSize = size -} - -func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { - if len(msg) <= 0 { - return fmt.Errorf("Empty messages can't be sent") - } - - b.msgChan <- msgPack{msg, pair} - return nil -} - -// updateSequenceNumbers is used to track the smallest sequenceNumber of any record in the batch. -// When flush() is called, the batcher sends the sequence number to the writer. When the writer -// checkpoints, it does so up to the latest message that was flushed successfully. -func (b *batcher) updateSequenceNumbers(pair kcl.SequencePair) { - b.mux.Lock() - defer b.mux.Unlock() - - if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) { - b.smallestSeq = pair - } -} - -func (b *batcher) Flush() { - b.flushChan <- struct{}{} -} - -func (b *batcher) batchSize(batch [][]byte) int { - total := 0 - for _, msg := range batch { - total += len(msg) - } - - return total -} - -func (b *batcher) flush(batch [][]byte) [][]byte { - if len(batch) > 0 { - b.sync.SendBatch(batch) - - b.mux.Lock() - b.smallestSeq = kcl.SequencePair{} - b.mux.Unlock() - } - return [][]byte{} -} - -func (b *batcher) startBatcher(msgChan <-chan msgPack, flushChan <-chan struct{}) { - batch := [][]byte{} - - for { - select { - case <-time.After(b.flushInterval): - batch = b.flush(batch) - case <-flushChan: - batch = b.flush(batch) - case pack := <-msgChan: - size := b.batchSize(batch) - if b.flushSize < size+len(pack.msg) { - batch = b.flush(batch) - } - - batch = append(batch, pack.msg) - b.updateSequenceNumbers(pack.sequencePair) - - if b.flushCount <= len(batch) || b.flushSize <= b.batchSize(batch) { - batch = b.flush(batch) - } - } - } -} diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go deleted file mode 100644 index 73d55fa..0000000 --- a/batchconsumer/batcher/message_batcher_test.go +++ /dev/null @@ -1,220 +0,0 @@ -package batcher - -import ( - "fmt" - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/Clever/amazon-kinesis-client-go/kcl" -) - -type batch [][]byte - -type MockSync struct { - flushChan chan struct{} - batches []batch -} - -func NewMockSync() *MockSync { - return &MockSync{ - flushChan: make(chan struct{}, 1), - batches: []batch{}, - } -} - -func (m *MockSync) SendBatch(b [][]byte) { - m.batches = append(m.batches, batch(b)) - m.flushChan <- struct{}{} -} - -func (m *MockSync) waitForFlush(timeout time.Duration) error { - select { - case <-m.flushChan: - return nil - case <-time.After(timeout): - return fmt.Errorf("timed out before flush (waited %s)", timeout.String()) - } -} - -var mockSequence = kcl.SequencePair{big.NewInt(99999), 12345} - -func TestBatchingByCount(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2, 1024*1024) - assert.NoError(err) - - t.Log("Batcher respect count limit") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hmmhmm"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(2, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - assert.Equal("heyhey", string(sync.batches[0][1])) - - t.Log("Batcher doesn't send partial batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingByTime(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Millisecond, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Batcher sends partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) - - t.Log("Batcher sends all messsages in partial batches when time expires") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(2, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - assert.Equal("yoyo", string(sync.batches[1][1])) - - t.Log("Batcher doesn't send empty batches") - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestBatchingBySize(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 8) - assert.NoError(err) - - t.Log("Large messages are sent immediately") - assert.NoError(batcher.AddMessage([]byte("hellohello"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hellohello", string(sync.batches[0][0])) - - t.Log("Batcher tries not to exceed size limit") - assert.NoError(batcher.AddMessage([]byte("heyhey"), mockSequence)) - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(2, len(sync.batches)) - assert.Equal(1, len(sync.batches[1])) - assert.Equal("heyhey", string(sync.batches[1][0])) - - t.Log("Batcher sends messages that didn't fit in previous batch") - assert.NoError(batcher.AddMessage([]byte("yoyo"), mockSequence)) // At this point "hihi" is in the batch - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(3, len(sync.batches)) - assert.Equal(2, len(sync.batches[2])) - assert.Equal("hihi", string(sync.batches[2][0])) - assert.Equal("yoyo", string(sync.batches[2][1])) - - t.Log("Batcher doesn't send partial batches") - assert.NoError(batcher.AddMessage([]byte("okok"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) -} - -func TestFlushing(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Hour, 2000000, 1024*1024) - assert.NoError(err) - - t.Log("Calling flush sends pending messages") - assert.NoError(batcher.AddMessage([]byte("hihi"), mockSequence)) - - err = sync.waitForFlush(time.Millisecond * 10) - assert.Error(err) - - batcher.Flush() - - err = sync.waitForFlush(time.Millisecond * 10) - assert.NoError(err) - - assert.Equal(1, len(sync.batches)) - assert.Equal(1, len(sync.batches[0])) - assert.Equal("hihi", string(sync.batches[0][0])) -} - -func TestSendingEmpty(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - batcher, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - t.Log("An error is returned when an empty message is sent") - err = batcher.AddMessage([]byte{}, mockSequence) - assert.Error(err) - assert.Equal(err.Error(), "Empty messages can't be sent") -} - -func TestUpdatingSequence(t *testing.T) { - assert := assert.New(t) - - sync := NewMockSync() - b, err := New(sync, time.Second, 10, 1024*1024) - assert.NoError(err) - - batcher := b.(*batcher) - - t.Log("Initally, smallestSeq is undefined") - assert.Nil(batcher.SmallestSequencePair().Sequence) - - expected := new(big.Int) - - t.Log("After AddMessage (seq=1), smallestSeq = 1") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(1), 1234}) - expected.SetInt64(1) - seq := batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=2), smallestSeq = 1 -- not updated because higher") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(2), 1234}) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("After AddMessage (seq=1), smallestSeq = 0") - batcher.updateSequenceNumbers(kcl.SequencePair{big.NewInt(0), 1234}) - expected.SetInt64(0) - seq = batcher.SmallestSequencePair() - assert.True(expected.Cmp(seq.Sequence) == 0) - - t.Log("Flushing batch clears smallest sequence pair") - assert.NoError(batcher.AddMessage([]byte("cdcd"), kcl.SequencePair{big.NewInt(2), 1234})) - sync.waitForFlush(time.Minute) - assert.Nil(batcher.SmallestSequencePair().Sequence) -} diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go new file mode 100644 index 0000000..bf2b2a2 --- /dev/null +++ b/batchconsumer/batchermanager.go @@ -0,0 +1,196 @@ +package batchconsumer + +import ( + "os" + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type tagMsgPair struct { + tag string + msg []byte + pair kcl.SequencePair +} + +type batcherManager struct { + log kv.KayveeLogger + sender Sender + chkpntManager *checkpointManager + + batchCount int + batchSize int + batchInterval time.Duration + + batchMsg chan tagMsgPair + lastIgnored chan kcl.SequencePair + lastProcessed chan kcl.SequencePair + shutdown chan struct{} +} + +func NewBatcherManager( + sender Sender, chkpntManager *checkpointManager, config Config, log kv.KayveeLogger, +) *batcherManager { + bm := &batcherManager{ + log: log, + sender: sender, + chkpntManager: chkpntManager, + + batchCount: config.BatchCount, + batchSize: config.BatchSize, + batchInterval: config.BatchInterval, + + batchMsg: make(chan tagMsgPair), + lastIgnored: make(chan kcl.SequencePair), + lastProcessed: make(chan kcl.SequencePair), + shutdown: make(chan struct{}), + } + + bm.startMessageHandler(bm.batchMsg, bm.lastIgnored, bm.lastProcessed, bm.shutdown) + + return bm +} + +func (b *batcherManager) BatchMessage(tag string, msg []byte, pair kcl.SequencePair) { + b.batchMsg <- tagMsgPair{tag, msg, pair} +} + +func (b *batcherManager) LatestIgnored(pair kcl.SequencePair) { + b.lastIgnored <- pair +} + +func (b *batcherManager) LatestProcessed(pair kcl.SequencePair) { + b.lastProcessed <- pair +} + +func (b *batcherManager) Shutdown() { + b.shutdown <- struct{}{} +} + +func (b *batcherManager) createBatcher() *batcher { + return &batcher{ + flushCount: b.batchCount, + flushSize: b.batchSize, + } +} + +func (b *batcherManager) sendBatch(batcher *batcher, tag string) { + if len(batcher.Batch) <= 0 { + return + } + + err := b.sender.SendBatch(batcher.Batch, tag) + switch e := err.(type) { + case nil: // Do nothing + case PartialSendBatchError: + b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) + for _, line := range e.FailedMessages { + b.log.ErrorD("failed-log", kv.M{"log": line}) + } + case CatastrophicSendBatchError: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + default: + b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) + os.Exit(1) + } + + batcher.Clear() +} + +func (b *batcherManager) sendCheckpoint( + tag string, lastIgnoredPair kcl.SequencePair, batchers map[string]*batcher, +) { + smallest := lastIgnoredPair + + for name, batcher := range batchers { + if tag == name { + continue + } + + if len(batcher.Batch) <= 0 { + continue + } + + // Check for empty because it's possible that no messages have been ignored + if smallest.IsEmpty() || batcher.SmallestSeq.IsLessThan(smallest) { + smallest = batcher.SmallestSeq + } + } + + if !smallest.IsEmpty() { + b.chkpntManager.Checkpoint(smallest) + } +} + +// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a +// go routine to avoid racey conditions. +func (b *batcherManager) startMessageHandler( + batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, + shutdown <-chan struct{}, +) { + go func() { + var lastProcessedPair kcl.SequencePair + var lastIgnoredPair kcl.SequencePair + batchers := map[string]*batcher{} + + for { + for tag, batcher := range batchers { // Flush batcher that hasn't been updated recently + if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { + b.sendBatch(batcher, tag) + b.sendCheckpoint(tag, lastIgnoredPair, batchers) + batcher.Clear() + } + } + + select { + case <-time.NewTimer(time.Second).C: + // Timer is janky way to ensure the code above is ran at regular intervals + // Can't put above code under this case because it's very possible that this + // timer is always pre-empted by other channels + case tmp := <-batchMsg: + batcher, ok := batchers[tmp.tag] + if !ok { + batcher = b.createBatcher() + batchers[tmp.tag] = batcher + } + + err := batcher.AddMessage(tmp.msg, tmp.pair) + if err == ErrBatchFull { + b.sendBatch(batcher, tmp.tag) + b.sendCheckpoint(tmp.tag, lastIgnoredPair, batchers) + + batcher.AddMessage(tmp.msg, tmp.pair) + } else if err != nil { + b.log.ErrorD("add-message", kv.M{ + "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, + }) + } + case pair := <-lastIgnored: + lastIgnoredPair = pair + + isPendingMessages := false + for _, batcher := range batchers { + if len(batcher.Batch) > 0 { + isPendingMessages = true + break + } + } + + if !isPendingMessages { + b.chkpntManager.Checkpoint(lastIgnoredPair) + } + case pair := <-lastProcessed: + lastProcessedPair = pair + case <-shutdown: + for tag, batcher := range batchers { + b.sendBatch(batcher, tag) + } + b.chkpntManager.Checkpoint(lastProcessedPair) + b.chkpntManager.Shutdown() + } + } + }() +} diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go new file mode 100644 index 0000000..9a877fd --- /dev/null +++ b/batchconsumer/checkpointmanager.go @@ -0,0 +1,89 @@ +package batchconsumer + +import ( + "time" + + kv "gopkg.in/Clever/kayvee-go.v6/logger" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type checkpointManager struct { + log kv.KayveeLogger + + checkpointRetries int + checkpointFreq time.Duration + + checkpoint chan kcl.SequencePair + shutdown chan struct{} +} + +func NewCheckpointManager( + checkpointer kcl.Checkpointer, config Config, log kv.KayveeLogger, +) *checkpointManager { + cm := &checkpointManager{ + log: log, + + checkpointRetries: config.CheckpointRetries, + checkpointFreq: config.CheckpointFreq, + + checkpoint: make(chan kcl.SequencePair), + shutdown: make(chan struct{}), + } + + cm.startCheckpointHandler(checkpointer, cm.checkpoint, cm.shutdown) + + return cm +} + +func (cm *checkpointManager) Checkpoint(pair kcl.SequencePair) { + cm.checkpoint <- pair +} + +func (cm *checkpointManager) Shutdown() { + cm.shutdown <- struct{}{} +} + +func (cm *checkpointManager) startCheckpointHandler( + checkpointer kcl.Checkpointer, checkpoint <-chan kcl.SequencePair, shutdown <-chan struct{}, +) { + go func() { + lastCheckpoint := time.Now() + + for { + pair := kcl.SequencePair{} + isShuttingDown := false + + select { + case pair = <-checkpoint: + case <-shutdown: + isShuttingDown = true + } + + // This is a write throttle to ensure we don't checkpoint faster than cm.checkpointFreq. + // The latest pair number is always used. + for !isShuttingDown && time.Now().Sub(lastCheckpoint) < cm.checkpointFreq { + select { + case pair = <-checkpoint: // Keep updating checkpoint pair while waiting + case <-shutdown: + isShuttingDown = true + case <-time.NewTimer(cm.checkpointFreq - time.Now().Sub(lastCheckpoint)).C: + } + } + + if !pair.IsEmpty() { + err := checkpointer.Checkpoint(pair, cm.checkpointRetries) + if err != nil { + cm.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error()}) + } else { + lastCheckpoint = time.Now() + } + } + + if isShuttingDown { + checkpointer.Shutdown() + return + } + } + }() +} diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 35f4ddc..2b6f023 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -18,6 +18,7 @@ type Config struct { // LogFile where consumer errors and failed log lines are saved LogFile string + // BatchInterval the upper bound on how often SendBatch is called with accumulated messages BatchInterval time.Duration // BatchCount is the number of messages that triggers a SendBatch call diff --git a/batchconsumer/sync.go b/batchconsumer/sync.go deleted file mode 100644 index f50a703..0000000 --- a/batchconsumer/sync.go +++ /dev/null @@ -1,10 +0,0 @@ -package batchconsumer - -type batcherSync struct { - tag string - writer *batchedWriter -} - -func (b *batcherSync) SendBatch(batch [][]byte) { - b.writer.SendBatch(batch, b.tag) -} diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index bb8e2e6..519eff7 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -5,23 +5,14 @@ import ( "encoding/base64" "fmt" "math/big" - "os" - "time" "golang.org/x/time/rate" kv "gopkg.in/Clever/kayvee-go.v6/logger" - "github.com/Clever/amazon-kinesis-client-go/batchconsumer/batcher" "github.com/Clever/amazon-kinesis-client-go/kcl" "github.com/Clever/amazon-kinesis-client-go/splitter" ) -type tagMsgPair struct { - tag string - msg []byte - pair kcl.SequencePair -} - type batchedWriter struct { config Config sender Sender @@ -29,12 +20,8 @@ type batchedWriter struct { shardID string - checkpointMsg chan kcl.SequencePair - checkpointShutdown chan struct{} - checkpointTag chan string - lastIgnoredPair chan kcl.SequencePair - batchMsg chan tagMsgPair - shutdown chan struct{} + chkpntManager *checkpointManager + batcherManager *batcherManager // Limits the number of records read from the stream rateLimiter *rate.Limiter @@ -54,164 +41,13 @@ func NewBatchedWriter(config Config, sender Sender, log kv.KayveeLogger) *batche func (b *batchedWriter) Initialize(shardID string, checkpointer kcl.Checkpointer) error { b.shardID = shardID - b.checkpointMsg = make(chan kcl.SequencePair) - b.checkpointShutdown = make(chan struct{}) - b.startCheckpointListener(checkpointer, b.checkpointMsg, b.checkpointShutdown) - b.checkpointTag = make(chan string, 100) // Buffered to workaround - b.batchMsg = make(chan tagMsgPair) - b.shutdown = make(chan struct{}) - b.lastIgnoredPair = make(chan kcl.SequencePair) - b.startMessageHandler(b.batchMsg, b.checkpointTag, b.lastIgnoredPair, b.shutdown) + b.chkpntManager = NewCheckpointManager(checkpointer, b.config, b.log) + b.batcherManager = NewBatcherManager(b.sender, b.chkpntManager, b.config, b.log) return nil } -func (b *batchedWriter) startCheckpointListener( - checkpointer kcl.Checkpointer, checkpointMsg <-chan kcl.SequencePair, - shutdown <-chan struct{}, -) { - go func() { - lastCheckpoint := time.Now() - - for { - pair := kcl.SequencePair{} - isShuttingDown := false - - select { - case pair = <-checkpointMsg: - case <-shutdown: - isShuttingDown = true - } - - // This is a write throttle to ensure we don't checkpoint faster than - // b.config.CheckpointFreq. The latest pair number is always used. - for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { - select { - case pair = <-checkpointMsg: // Keep updating checkpoint pair while waiting - case <-shutdown: - isShuttingDown = true - case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: - } - } - - if !pair.IsEmpty() { - err := checkpointer.Checkpoint(pair, b.config.CheckpointRetries) - if err != nil { - b.log.ErrorD("checkpoint-err", kv.M{"msg": err.Error(), "shard-id": b.shardID}) - } else { - lastCheckpoint = time.Now() - } - } - - if isShuttingDown { - checkpointer.Shutdown() - return - } - } - }() -} - -func (b *batchedWriter) createBatcher(tag string) batcher.Batcher { - sync := &batcherSync{ - tag: tag, - writer: b, - } - batch, err := batcher.New(sync, b.config.BatchInterval, b.config.BatchCount, b.config.BatchSize) - if err != nil { - b.log.ErrorD("create-batcher", kv.M{"msg": err.Error(), "tag": tag}) - } - - return batch -} - -// startMessageDistributer starts a go-routine that routes messages to batches. It's in uses a -// go routine to avoid racey conditions. -func (b *batchedWriter) startMessageHandler( - batchMsg <-chan tagMsgPair, checkpointTag <-chan string, lastIgnored <-chan kcl.SequencePair, - shutdown <-chan struct{}, -) { - getBatcher := make(chan string) - rtnBatcher := make(chan batcher.Batcher) - shutdownAdder := make(chan struct{}) - - go func() { - for { - select { - case tmp := <-batchMsg: - getBatcher <- tmp.tag - batcher := <-rtnBatcher - err := batcher.AddMessage(tmp.msg, tmp.pair) - if err != nil { - b.log.ErrorD("add-message", kv.M{ - "err": err.Error(), "msg": string(tmp.msg), "tag": tmp.tag, - }) - } - case <-shutdownAdder: - } - } - }() - - go func() { - var lastIgnoredPair kcl.SequencePair - batchers := map[string]batcher.Batcher{} - areBatchersEmpty := true - - for { - select { - case tag := <-getBatcher: - batcher, ok := batchers[tag] - if !ok { - batcher = b.createBatcher(tag) - batchers[tag] = batcher - } - - areBatchersEmpty = false - rtnBatcher <- batcher - case tag := <-checkpointTag: - smallest := lastIgnoredPair - isAllEmpty := true - - for name, batch := range batchers { - if tag == name { - continue - } - - pair := batch.SmallestSequencePair() - if pair.IsEmpty() { // Occurs when batch has no items - continue - } - - // Check for empty because it's possible that no messages have been ignored - if smallest.IsEmpty() || pair.IsLessThan(smallest) { - smallest = pair - } - - isAllEmpty = false - } - - if !smallest.IsEmpty() { - b.checkpointMsg <- smallest - } - areBatchersEmpty = isAllEmpty - case pair := <-lastIgnored: - if areBatchersEmpty && !pair.IsEmpty() { - b.checkpointMsg <- pair - } - lastIgnoredPair = pair - case <-shutdown: - for _, batch := range batchers { - batch.Flush() - } - b.checkpointMsg <- b.lastProcessedSeq - b.checkpointShutdown <- struct{}{} - - areBatchersEmpty = true - } - } - }() -} - func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) { // We handle two types of records: // - records emitted from CWLogs Subscription @@ -278,47 +114,30 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { // it's been sent. When batches are sent, conceptually we first find the smallest // sequence number amount all the batch (let's call it A). We then checkpoint at // the A-1 sequence number. - b.batchMsg <- tagMsgPair{tag, msg, prevPair} + b.batcherManager.BatchMessage(tag, msg, prevPair) wasPairIgnored = false } } prevPair = pair if wasPairIgnored { - b.lastIgnoredPair <- pair + b.batcherManager.LatestIgnored(pair) } + b.batcherManager.LatestProcessed(pair) } b.lastProcessedSeq = pair return nil } -func (b *batchedWriter) SendBatch(batch [][]byte, tag string) { - err := b.sender.SendBatch(batch, tag) - switch e := err.(type) { - case nil: // Do nothing - case PartialSendBatchError: - b.log.ErrorD("send-batch", kv.M{"msg": e.Error()}) - for _, line := range e.FailedMessages { - b.log.ErrorD("failed-log", kv.M{"log": line}) - } - case CatastrophicSendBatchError: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - default: - b.log.CriticalD("send-batch", kv.M{"msg": e.Error()}) - os.Exit(1) - } - - b.checkpointTag <- tag -} - func (b *batchedWriter) Shutdown(reason string) error { if reason == "TERMINATE" { b.log.InfoD("terminate-signal", kv.M{"shard-id": b.shardID}) } else { b.log.ErrorD("shutdown-failover", kv.M{"shard-id": b.shardID, "reason": reason}) } - b.shutdown <- struct{}{} + + b.batcherManager.Shutdown() + return nil } diff --git a/batchconsumer/writer_test.go b/batchconsumer/writer_test.go index d87d1ce..0f7e189 100644 --- a/batchconsumer/writer_test.go +++ b/batchconsumer/writer_test.go @@ -192,8 +192,6 @@ func TestProcessRecordsMutliBatchBasic(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -270,8 +268,6 @@ func TestProcessRecordsMutliBatchWithIgnores(t *testing.T) { }) assert.NoError(err) - time.Sleep(200 * time.Millisecond) // Sleep to ensure checkpoint get flushed at least once - err = wrt.Shutdown("TERMINATE") assert.NoError(err) @@ -341,7 +337,6 @@ func TestStaggeredCheckpionting(t *testing.T) { assert.NoError(err) mocksender.Shutdown() - mockcheckpointer.Shutdown() // Test to make sure writer doesn't prematurely checkpoint messages // Checkpoints 5,6,7,8 will never be submitted because the 3rd "tag1" is in a batch