Added an external go routine to ensure stale batches get flushed
This commit is contained in:
parent
27cce44140
commit
68a16cfe6a
1 changed files with 17 additions and 12 deletions
|
|
@ -137,25 +137,30 @@ func (b *batcherManager) startMessageHandler(
|
|||
batchMsg <-chan tagMsgPair, lastIgnored, lastProcessed <-chan kcl.SequencePair,
|
||||
shutdown <-chan chan<- struct{},
|
||||
) {
|
||||
flushStaleBatches := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for { // Flush batches that haven't been updated recently
|
||||
<-time.NewTimer(time.Second).C
|
||||
flushStaleBatches <- struct{}{}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
var lastProcessedPair kcl.SequencePair
|
||||
var lastIgnoredPair kcl.SequencePair
|
||||
batchers := map[string]*batcher{}
|
||||
|
||||
for {
|
||||
for tag, batcher := range batchers { // Flush batcher that hasn't been updated recently
|
||||
if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) {
|
||||
b.sendBatch(batcher, tag)
|
||||
b.sendCheckpoint(tag, lastIgnoredPair, batchers)
|
||||
batcher.Clear()
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.NewTimer(time.Second).C:
|
||||
// Timer is janky way to ensure the code above is ran at regular intervals
|
||||
// Can't put above code under this case because it's very possible that this
|
||||
// timer is always pre-empted by other channels
|
||||
case <-flushStaleBatches:
|
||||
for tag, batcher := range batchers {
|
||||
if b.batchInterval <= time.Now().Sub(batcher.LastUpdated) {
|
||||
b.sendBatch(batcher, tag)
|
||||
b.sendCheckpoint(tag, lastIgnoredPair, batchers)
|
||||
batcher.Clear()
|
||||
}
|
||||
}
|
||||
case tmp := <-batchMsg:
|
||||
batcher, ok := batchers[tmp.tag]
|
||||
if !ok {
|
||||
|
|
|
|||
Loading…
Reference in a new issue