Added mutex to Checkpoint function to avoid racey conditions
This commit is contained in:
parent
17cfe98efa
commit
3c56b57e6b
1 changed files with 6 additions and 1 deletions
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RecordProcessor interface {
|
type RecordProcessor interface {
|
||||||
|
|
@ -23,6 +24,8 @@ func (ce CheckpointError) Error() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Checkpointer struct {
|
type Checkpointer struct {
|
||||||
|
mux sync.Mutex
|
||||||
|
|
||||||
ioHandler ioHandler
|
ioHandler ioHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -39,6 +42,9 @@ func (c *Checkpointer) getAction() (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
|
func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error {
|
||||||
|
c.mux.Lock()
|
||||||
|
defer c.mux.Unlock()
|
||||||
|
|
||||||
c.ioHandler.writeAction(ActionCheckpoint{
|
c.ioHandler.writeAction(ActionCheckpoint{
|
||||||
Action: "checkpoint",
|
Action: "checkpoint",
|
||||||
SequenceNumber: sequenceNumber,
|
SequenceNumber: sequenceNumber,
|
||||||
|
|
@ -62,7 +68,6 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ioHandler struct {
|
type ioHandler struct {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue