diff --git a/batchconsumer/batcher/message_batcher.go b/batchconsumer/batcher/message_batcher.go index b57a6e5..1cf93e5 100644 --- a/batchconsumer/batcher/message_batcher.go +++ b/batchconsumer/batcher/message_batcher.go @@ -13,13 +13,14 @@ type SequencePair struct { SubSequence int } +func (s SequencePair) IsEmpty() bool { + return s.Sequence == nil +} + func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.Sequence == nil { + if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable return false } - if pair.Sequence == nil { - return true - } cmp := s.Sequence.Cmp(pair.Sequence) if cmp == -1 { @@ -62,8 +63,7 @@ type batcher struct { flushCount int flushSize int - // smallestSeq and smallestSubSeq are used to track the highest sequence number - // of any record in the batch. This is used for checkpointing. + // smallestSeq are used for checkpointing smallestSeq SequencePair sync Sync @@ -129,8 +129,8 @@ func (b *batcher) updateSequenceNumbers(pair SequencePair) { b.mux.Lock() defer b.mux.Unlock() - if pair.IsLessThan(b.smallestSeq) { - b.smallestSeq = SequencePair{pair.Sequence, pair.SubSequence} + if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) { + b.smallestSeq = pair } } diff --git a/batchconsumer/batcher/message_batcher_test.go b/batchconsumer/batcher/message_batcher_test.go index 9215da7..23f24f9 100644 --- a/batchconsumer/batcher/message_batcher_test.go +++ b/batchconsumer/batcher/message_batcher_test.go @@ -226,9 +226,8 @@ func TestSequencePairIsLessThan(t *testing.T) { isLess bool }{ {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, - {left: SequencePair{nil, 0}, right: SequencePair{big10, 10}, isLess: false}, - - {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: true}, + {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false}, + {left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false}, {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, @@ -250,3 +249,13 @@ func TestSequencePairIsLessThan(t *testing.T) { assert.Equal(test.isLess, left.IsLessThan(right)) } } + +func TestSequencePairEmpty(t *testing.T) { + assert := assert.New(t) + + assert.True(SequencePair{nil, 0}.IsEmpty()) + assert.True(SequencePair{nil, 10000}.IsEmpty()) + + assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) +}