Added IsEmpty to SequencePair

This commit is contained in:
Xavi Ramirez 2017-07-21 01:23:25 +00:00
parent b0f769bfa7
commit ae37b57e61
2 changed files with 20 additions and 11 deletions

View file

@ -13,13 +13,14 @@ type SequencePair struct {
SubSequence int SubSequence int
} }
func (s SequencePair) IsEmpty() bool {
return s.Sequence == nil
}
func (s SequencePair) IsLessThan(pair SequencePair) bool { func (s SequencePair) IsLessThan(pair SequencePair) bool {
if s.Sequence == nil { if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable
return false return false
} }
if pair.Sequence == nil {
return true
}
cmp := s.Sequence.Cmp(pair.Sequence) cmp := s.Sequence.Cmp(pair.Sequence)
if cmp == -1 { if cmp == -1 {
@ -62,8 +63,7 @@ type batcher struct {
flushCount int flushCount int
flushSize int flushSize int
// smallestSeq and smallestSubSeq are used to track the highest sequence number // smallestSeq are used for checkpointing
// of any record in the batch. This is used for checkpointing.
smallestSeq SequencePair smallestSeq SequencePair
sync Sync sync Sync
@ -129,8 +129,8 @@ func (b *batcher) updateSequenceNumbers(pair SequencePair) {
b.mux.Lock() b.mux.Lock()
defer b.mux.Unlock() defer b.mux.Unlock()
if pair.IsLessThan(b.smallestSeq) { if b.smallestSeq.IsEmpty() || pair.IsLessThan(b.smallestSeq) {
b.smallestSeq = SequencePair{pair.Sequence, pair.SubSequence} b.smallestSeq = pair
} }
} }

View file

@ -226,9 +226,8 @@ func TestSequencePairIsLessThan(t *testing.T) {
isLess bool isLess bool
}{ }{
{left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false}, {left: SequencePair{nil, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{nil, 0}, right: SequencePair{big10, 10}, isLess: false}, {left: SequencePair{nil, 0}, right: SequencePair{big10, 0}, isLess: false},
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: false},
{left: SequencePair{big10, 0}, right: SequencePair{nil, 0}, isLess: true},
{left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true}, {left: SequencePair{big5, 0}, right: SequencePair{big10, 0}, isLess: true},
{left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true}, {left: SequencePair{big5, 0}, right: SequencePair{big5, 10}, isLess: true},
@ -250,3 +249,13 @@ func TestSequencePairIsLessThan(t *testing.T) {
assert.Equal(test.isLess, left.IsLessThan(right)) assert.Equal(test.isLess, left.IsLessThan(right))
} }
} }
func TestSequencePairEmpty(t *testing.T) {
assert := assert.New(t)
assert.True(SequencePair{nil, 0}.IsEmpty())
assert.True(SequencePair{nil, 10000}.IsEmpty())
assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty())
assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty())
}