diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index 6aa9a64..4ac1d54 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -133,8 +133,10 @@ func (b *batcher) batchSize(batch [][]byte) int { func (b *batcher) flush(batch [][]byte) [][]byte { if len(batch) > 0 { + b.mux.Lock() b.sync.SendBatch(batch) b.smallestSeq = SequencePair{nil, 0} + b.mux.Unlock() } return [][]byte{} }