Another refactor

This commit is contained in:
Xavi Ramirez 2017-08-06 02:59:28 +00:00
parent c5f75d6554
commit c814742afa

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"sync"
"time"
)
@ -147,23 +148,18 @@ func New(
ioHandler: i,
recordProcessor: recordProcessor,
next: make(chan struct{}),
out: make(chan string),
outErr: make(chan error),
checkpoint: make(chan SequencePair),
checkpointErr: make(chan error),
}
}
type KCLProcess struct {
ckpmux sync.Mutex
readmux sync.Mutex
ioHandler ioHandler
recordProcessor RecordProcessor
next chan struct{}
out chan string
outErr chan error
checkpoint chan SequencePair
checkpointErr chan error
}
@ -192,6 +188,9 @@ func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
}
func (kclp *KCLProcess) handleLine(line string) error {
kclp.readmux.Lock()
defer kclp.readmux.Unlock()
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return err
@ -208,8 +207,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error {
sleepDuration := 5 * time.Second
for n := 0; n <= retryCount; n++ {
kclp.checkpoint <- pair
err := <-kclp.checkpointErr
err := kclp.processCheckpoint(pair)
if err == nil {
return nil
}
@ -242,6 +240,9 @@ func (kclp *KCLProcess) Shutdown() {
}
func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
kclp.ckpmux.Lock()
defer kclp.ckpmux.Unlock()
var seq *string
var subSeq *int
if !pair.IsEmpty() { // an empty pair is a signal to shutdown
@ -272,44 +273,22 @@ func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
return nil
}
func (kclp *KCLProcess) startLineProcessor(
next chan struct{}, out chan string, outErr chan error,
checkpoint chan SequencePair, checkpointErr chan error,
) {
go func() {
for {
select {
case <-next:
line, err := kclp.ioHandler.readLine()
if err == nil {
if line == "" {
err = fmt.Errorf("Empty read line recieved")
}
err = kclp.handleLine(line)
}
outErr <- err
case pair := <-checkpoint:
err := kclp.processCheckpoint(pair)
checkpointErr <- err
}
}
}()
}
func (kclp *KCLProcess) processNextLine() error {
kclp.next <- struct{}{} // We're ready for a new line
return <-kclp.outErr
}
func (kclp *KCLProcess) Run() {
kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr)
for {
err := kclp.processNextLine()
line, err := kclp.ioHandler.readLine()
if err == io.EOF {
kclp.ioHandler.writeError("IO stream closed")
return
} else if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err))
return
} else if line == "" {
kclp.ioHandler.writeError("Empty read line recieved")
return
}
err = kclp.handleLine(line)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
return
}