diff --git a/kcl/kcl.go b/kcl/kcl.go index 86a07e4..c470637 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -188,9 +188,6 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } func (kclp *KCLProcess) handleLine(line string) error { - kclp.readmux.Lock() - defer kclp.readmux.Unlock() - action, err := kclp.ioHandler.loadAction(line) if err != nil { return err @@ -204,9 +201,12 @@ func (kclp *KCLProcess) handleLine(line string) error { } func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { + kclp.ckpmux.Lock() + defer kclp.ckpmux.Unlock() sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { + fmt.Printf("Trying to checkpoint %d\n", n) err := kclp.processCheckpoint(pair) if err == nil { return nil @@ -240,8 +240,8 @@ func (kclp *KCLProcess) Shutdown() { } func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - kclp.ckpmux.Lock() - defer kclp.ckpmux.Unlock() + kclp.readmux.Lock() + defer kclp.readmux.Unlock() var seq *string var subSeq *int @@ -270,11 +270,13 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { if action.Error != nil && *action.Error != "" { return CheckpointError{e: *action.Error} } + fmt.Println("Successful checkpoint") return nil } func (kclp *KCLProcess) Run() { for { + kclp.readmux.Lock() line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -292,5 +294,6 @@ func (kclp *KCLProcess) Run() { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } + kclp.readmux.Unlock() } }