diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 1cbe54b..b8b26f6 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -137,25 +137,30 @@ func (b *batcherManager) startMessageHandler( batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair, shutdown <-chan chan<- struct{}, ) { + flushStaleBatches := make(chan struct{}) + + go func() { + for { // Flush batches that haven't been updated recently + <-time.NewTimer(time.Second).C + flushStaleBatches <- struct{}{} + } + }() + go func() { var lastProcessedPair kcl.SequencePair var lastIgnoredPair kcl.SequencePair batchers := map[string]*batcher{} for { - for tag, batcher := range batchers { // Flush batcher that hasn't been updated recently - if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { - b.sendBatch(batcher, tag) - b.sendCheckpoint(tag, lastIgnoredPair, batchers) - batcher.Clear() - } - } - select { - case <-time.NewTimer(time.Second).C: - // Timer is janky way to ensure the code above is ran at regular intervals - // Can't put above code under this case because it's very possible that this - // timer is always pre-empted by other channels + case <-flushStaleBatches: + for tag, batcher := range batchers { + if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) { + b.sendBatch(batcher, tag) + b.sendCheckpoint(tag, lastIgnoredPair, batchers) + batcher.Clear() + } + } case tmp := <-batchMsg: batcher, ok := batchers[tmp.tag] if !ok {