diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index 1dc0d09..4260025 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -104,20 +104,20 @@ func (b *batchedWriter) startCheckpointListener( lastCheckpoint := time.Now() for { - seq := kcl.SequencePair{} + pair := kcl.SequencePair{} isShuttingDown := false select { - case seq = <-checkpointMsg: + case pair = <-checkpointMsg: case <-shutdown: isShuttingDown = true } // This is a write throttle to ensure we don't checkpoint faster than - // b.config.CheckpointFreq. The latest seq number is always used. + // b.config.CheckpointFreq. The latest pair number is always used. for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq { select { - case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting + case pair = <-checkpointMsg: // Keep updating checkpoint pair while waiting case <-shutdown: isShuttingDown = true case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C: @@ -125,7 +125,7 @@ func (b *batchedWriter) startCheckpointListener( } retry := true - for n := 0; retry && !seq.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { + for n := 0; retry && !pair.IsEmpty() && n < b.config.CheckpointRetries+1; n++ { err := checkpointer.Checkpoint(seq, 5) if err == nil { // Successfully checkpointed! lastCheckpoint = time.Now()