diff --git a/kcl/kcl.go b/kcl/kcl.go index 1933a68..8d25f24 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "sync" ) type RecordProcessor interface { @@ -23,6 +24,8 @@ func (ce CheckpointError) Error() string { } type Checkpointer struct { + mux sync.Mutex + ioHandler ioHandler } @@ -39,6 +42,9 @@ func (c *Checkpointer) getAction() (interface{}, error) { } func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int) error { + c.mux.Lock() + defer c.mux.Unlock() + c.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: sequenceNumber, @@ -62,7 +68,6 @@ func (c *Checkpointer) Checkpoint(sequenceNumber *string, subSequenceNumber *int } } return nil - } type ioHandler struct {