diff --git a/kcl/kcl.go b/kcl/kcl.go index a6f4776..86a07e4 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "sync" "time" ) @@ -147,23 +148,18 @@ func New( ioHandler: i, recordProcessor: recordProcessor, - next: make(chan struct{}), - out: make(chan string), - outErr: make(chan error), - checkpoint: make(chan SequencePair), checkpointErr: make(chan error), } } type KCLProcess struct { + ckpmux sync.Mutex + readmux sync.Mutex + ioHandler ioHandler recordProcessor RecordProcessor - next chan struct{} - out chan string - outErr chan error - checkpoint chan SequencePair checkpointErr chan error } @@ -192,6 +188,9 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) { } func (kclp *KCLProcess) handleLine(line string) error { + kclp.readmux.Lock() + defer kclp.readmux.Unlock() + action, err := kclp.ioHandler.loadAction(line) if err != nil { return err @@ -208,8 +207,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error { sleepDuration := 5 * time.Second for n := 0; n <= retryCount; n++ { - kclp.checkpoint <- pair - err := <-kclp.checkpointErr + err := kclp.processCheckpoint(pair) if err == nil { return nil } @@ -242,6 +240,9 @@ func (kclp *KCLProcess) Shutdown() { } func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { + kclp.ckpmux.Lock() + defer kclp.ckpmux.Unlock() + var seq *string var subSeq *int if !pair.IsEmpty() { // an empty pair is a signal to shutdown @@ -272,44 +273,22 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { return nil } -func (kclp *KCLProcess) startLineProcessor( - next chan struct{}, out chan string, outErr chan error, - checkpoint chan SequencePair, checkpointErr chan error, -) { - go func() { - for { - select { - case <-next: - line, err := kclp.ioHandler.readLine() - if err == nil { - if line == "" { - err = fmt.Errorf("Empty read line recieved") - } - err = kclp.handleLine(line) - } - outErr <- err - case pair := <-checkpoint: - err := kclp.processCheckpoint(pair) - checkpointErr <- err - } - } - }() -} - -func (kclp *KCLProcess) processNextLine() error { - kclp.next <- struct{}{} // We're ready for a new line - - return <-kclp.outErr -} - func (kclp *KCLProcess) Run() { - kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr) for { - err := kclp.processNextLine() + line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") return } else if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err)) + return + } else if line == "" { + kclp.ioHandler.writeError("Empty read line recieved") + return + } + + err = kclp.handleLine(line) + if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return }