diff --git a/batchconsumer/batcher.go b/batchconsumer/batcher.go index 06e8dca..2d8480b 100644 --- a/batchconsumer/batcher.go +++ b/batchconsumer/batcher.go @@ -38,7 +38,7 @@ func (b *batcher) AddMessage(msg []byte, pair kcl.SequencePair) error { } b.Batch = append(b.Batch, msg) - if b.SmallestSeq.IsEmpty() || pair.IsLessThan(b.SmallestSeq) { + if b.SmallestSeq.IsNil() || pair.IsLessThan(b.SmallestSeq) { b.SmallestSeq = pair } b.LastUpdated = time.Now() diff --git a/batchconsumer/batchermanager.go b/batchconsumer/batchermanager.go index 7c0d3ad..bb1a3fe 100644 --- a/batchconsumer/batchermanager.go +++ b/batchconsumer/batchermanager.go @@ -128,12 +128,12 @@ func (b *batcherManager) sendCheckpoint( } // Check for empty because it's possible that no messages have been ignored - if smallest.IsEmpty() || batcher.SmallestSeq.IsLessThan(smallest) { + if smallest.IsNil() || batcher.SmallestSeq.IsLessThan(smallest) { smallest = batcher.SmallestSeq } } - if !smallest.IsEmpty() { + if !smallest.IsNil() { b.chkpntManager.Checkpoint(smallest) } } diff --git a/batchconsumer/checkpointmanager.go b/batchconsumer/checkpointmanager.go index f526e27..6f312e0 100644 --- a/batchconsumer/checkpointmanager.go +++ b/batchconsumer/checkpointmanager.go @@ -73,7 +73,7 @@ func (cm *checkpointManager) startCheckpointHandler( } } - if !pair.IsEmpty() { + if !pair.IsNil() { checkpointer.Checkpoint(pair) lastCheckpoint = time.Now() stats.Counter("checkpoints-sent", 1) diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 095dd73..af102d1 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -82,7 +82,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { } pair = kcl.SequencePair{seq, record.SubSequenceNumber} - if prevPair.IsEmpty() { // Handles on-start edge case where b.lastProcessSeq is empty + if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } diff --git a/kcl/kcl.go b/kcl/kcl.go index 93aaf0d..3270be1 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -156,7 +156,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() - if kclp.nextCheckpointPair.IsEmpty() || kclp.nextCheckpointPair.IsLessThan(pair) { + if kclp.nextCheckpointPair.IsNil() || kclp.nextCheckpointPair.IsLessThan(pair) { kclp.nextCheckpointPair = pair } } @@ -189,7 +189,7 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { subSeq := action.SubSequenceNumber kclp.ckpmux.Lock() - if !kclp.nextCheckpointPair.IsEmpty() { + if !kclp.nextCheckpointPair.IsNil() { tmp := kclp.nextCheckpointPair.Sequence.String() seq = &tmp subSeq = &kclp.nextCheckpointPair.SubSequence @@ -279,7 +279,7 @@ func (kclp *KCLProcess) Run() { } kclp.ckpmux.Lock() - if !kclp.nextCheckpointPair.IsEmpty() { + if !kclp.nextCheckpointPair.IsNil() { seq := kclp.nextCheckpointPair.Sequence.String() subSeq := kclp.nextCheckpointPair.SubSequence diff --git a/kcl/sequencepair.go b/kcl/sequencepair.go index df0fbfe..685d099 100644 --- a/kcl/sequencepair.go +++ b/kcl/sequencepair.go @@ -10,12 +10,12 @@ type SequencePair struct { SubSequence int } -func (s SequencePair) IsEmpty() bool { +func (s SequencePair) IsNil() bool { return s.Sequence == nil } func (s SequencePair) IsLessThan(pair SequencePair) bool { - if s.IsEmpty() || pair.IsEmpty() { // empty pairs are incomparable + if s.IsNil() || pair.IsNil() { // empty pairs are incomparable return false } diff --git a/kcl/sequencepair_test.go b/kcl/sequencepair_test.go index 58d253f..51e0744 100644 --- a/kcl/sequencepair_test.go +++ b/kcl/sequencepair_test.go @@ -46,9 +46,9 @@ func TestSequencePairIsLessThan(t *testing.T) { func TestSequencePairEmpty(t *testing.T) { assert := assert.New(t) - assert.True(SequencePair{nil, 0}.IsEmpty()) - assert.True(SequencePair{nil, 10000}.IsEmpty()) + assert.True(SequencePair{nil, 0}.IsNil()) + assert.True(SequencePair{nil, 10000}.IsNil()) - assert.False(SequencePair{big.NewInt(10), 0}.IsEmpty()) - assert.False(SequencePair{big.NewInt(0), 1000}.IsEmpty()) + assert.False(SequencePair{big.NewInt(10), 0}.IsNil()) + assert.False(SequencePair{big.NewInt(0), 1000}.IsNil()) }