Renamed variable from seq to pair
This commit is contained in:
parent
6e9457cbcf
commit
c6fe4cef37
1 changed files with 5 additions and 5 deletions
|
|
@ -104,20 +104,20 @@ func (b *batchedWriter) startCheckpointListener(
|
||||||
lastCheckpoint := time.Now()
|
lastCheckpoint := time.Now()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
seq := kcl.SequencePair{}
|
pair := kcl.SequencePair{}
|
||||||
isShuttingDown := false
|
isShuttingDown := false
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case seq = <-checkpointMsg:
|
case pair = <-checkpointMsg:
|
||||||
case <-shutdown:
|
case <-shutdown:
|
||||||
isShuttingDown = true
|
isShuttingDown = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is a write throttle to ensure we don't checkpoint faster than
|
// This is a write throttle to ensure we don't checkpoint faster than
|
||||||
// b.config.CheckpointFreq. The latest seq number is always used.
|
// b.config.CheckpointFreq. The latest pair number is always used.
|
||||||
for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
|
for !isShuttingDown && time.Now().Sub(lastCheckpoint) < b.config.CheckpointFreq {
|
||||||
select {
|
select {
|
||||||
case seq = <-checkpointMsg: // Keep updating checkpoint seq while waiting
|
case pair = <-checkpointMsg: // Keep updating checkpoint pair while waiting
|
||||||
case <-shutdown:
|
case <-shutdown:
|
||||||
isShuttingDown = true
|
isShuttingDown = true
|
||||||
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
case <-time.NewTimer(b.config.CheckpointFreq - time.Now().Sub(lastCheckpoint)).C:
|
||||||
|
|
@ -125,7 +125,7 @@ func (b *batchedWriter) startCheckpointListener(
|
||||||
}
|
}
|
||||||
|
|
||||||
retry := true
|
retry := true
|
||||||
for n := 0; retry && !seq.IsEmpty() && n < b.config.CheckpointRetries+1; n++ {
|
for n := 0; retry && !pair.IsEmpty() && n < b.config.CheckpointRetries+1; n++ {
|
||||||
err := checkpointer.Checkpoint(seq, 5)
|
err := checkpointer.Checkpoint(seq, 5)
|
||||||
if err == nil { // Successfully checkpointed!
|
if err == nil { // Successfully checkpointed!
|
||||||
lastCheckpoint = time.Now()
|
lastCheckpoint = time.Now()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue