Another refactor to simply and likely speed up code. New code allows checkpoint and readline messages to be interleaved.

This commit is contained in:
Xavi Ramirez 2017-08-06 10:58:46 +00:00
parent 1bff01ff4f
commit 4809cdb4e6

View file

@ -21,22 +21,12 @@ type Checkpointer interface {
Shutdown() Shutdown()
} }
type CheckpointError struct {
e string
}
func (ce CheckpointError) Error() string {
return ce.e
}
type ioHandler struct { type ioHandler struct {
inputFile io.Reader inputFile io.Reader
outputFile io.Writer outputFile io.Writer
errorFile io.Writer errorFile io.Writer
} }
//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.)
func (i ioHandler) writeLine(line string) { func (i ioHandler) writeLine(line string) {
fmt.Fprintf(i.outputFile, "\n%s\n", line) fmt.Fprintf(i.outputFile, "\n%s\n", line)
} }
@ -163,39 +153,6 @@ type KCLProcess struct {
nextCheckpointPair SequencePair nextCheckpointPair SequencePair
} }
func (kclp *KCLProcess) reportDone(responseFor string) error {
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line")
return kclp.ioHandler.writeAction(struct {
Action string `json:"action"`
ResponseFor string `json:"responseFor"`
}{
Action: "status",
ResponseFor: responseFor,
})
}
func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
switch action := a.(type) {
case ActionInitialize:
return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp)
case ActionProcessRecords:
return action.Action, kclp.recordProcessor.ProcessRecords(action.Records)
case ActionShutdown:
return action.Action, kclp.recordProcessor.Shutdown(action.Reason)
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
}
}
func (kclp *KCLProcess) handleLine(line string) (string, error) {
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return "", err
}
return kclp.performAction(action)
}
func (kclp *KCLProcess) Checkpoint(pair SequencePair) { func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
kclp.ckpmux.Lock() kclp.ckpmux.Lock()
defer kclp.ckpmux.Unlock() defer kclp.ckpmux.Unlock()
@ -209,78 +166,94 @@ func (kclp *KCLProcess) Shutdown() {
kclp.isShuttingDown = true kclp.isShuttingDown = true
} }
func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error { func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
sleepDuration := 5 * time.Second if action.Error == nil { // Successful checkpoint
return nil
}
for n := 0; n <= retryCount; n++ { msg := *action.Error
fmt.Printf("Trying to checkpoint %d\n", n) switch msg {
err := kclp.processCheckpoint(pair) case "ShutdownException":
if err == nil { return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
return nil case "ThrottlingException":
} sleep := 5 * time.Second
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep)
time.Sleep(sleep)
case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing")
default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg)
}
if cperr, ok := err.(CheckpointError); ok { seq := action.SequenceNumber
switch cperr.Error() { subSeq := action.SubSequenceNumber
case "ShutdownException":
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
case "ThrottlingException":
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration)
case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n")
default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
}
} else {
return err
}
if n == retryCount { kclp.ckpmux.Lock()
return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) if !kclp.nextCheckpointPair.IsEmpty() {
} tmp := kclp.nextCheckpointPair.Sequence.String()
seq = &tmp
subSeq = &kclp.nextCheckpointPair.SubSequence
}
kclp.ckpmux.Unlock()
time.Sleep(sleepDuration) if seq != nil && subSeq != nil {
return kclp.sendCheckpoint(seq, subSeq)
} }
return nil return nil
} }
func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
var seq *string return kclp.ioHandler.writeAction(ActionCheckpoint{
var subSeq *int
if !pair.IsEmpty() { // an empty pair is a signal to shutdown
tmp := pair.Sequence.String()
seq = &tmp
subSeq = &pair.SubSequence
}
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint")
kclp.ioHandler.writeAction(ActionCheckpoint{
Action: "checkpoint", Action: "checkpoint",
SequenceNumber: seq, SequenceNumber: seq,
SubSequenceNumber: subSeq, SubSequenceNumber: subSeq,
}) })
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line") }
line, err := kclp.ioHandler.readLine()
func (kclp *KCLProcess) reportDone(responseFor string) error {
return kclp.ioHandler.writeAction(struct {
Action string `json:"action"`
ResponseFor string `json:"responseFor"`
}{
Action: "status",
ResponseFor: responseFor,
})
}
func (kclp *KCLProcess) handleLine(line string) error {
action, err := kclp.ioHandler.loadAction(line)
if err != nil { if err != nil {
return err return err
} }
actionI, err := kclp.ioHandler.loadAction(line)
if err != nil { switch action := action.(type) {
return err case ActionCheckpoint:
err = kclp.handleCheckpointAction(action)
case ActionInitialize:
err = kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err == nil {
err = kclp.reportDone(action.Action)
}
case ActionProcessRecords:
err = kclp.recordProcessor.ProcessRecords(action.Records)
if err == nil {
err = kclp.reportDone(action.Action)
}
case ActionShutdown:
err = kclp.recordProcessor.Shutdown(action.Reason)
if err == nil {
err = kclp.reportDone(action.Action)
}
default:
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
} }
action, ok := actionI.(ActionCheckpoint)
if !ok { return err
return fmt.Errorf("expected checkpoint response, got '%s'", line)
}
if action.Error != nil && *action.Error != "" {
return CheckpointError{e: *action.Error}
}
fmt.Println("Successful checkpoint")
return nil
} }
func (kclp *KCLProcess) Run() { func (kclp *KCLProcess) Run() {
for { for {
fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line")
line, err := kclp.ioHandler.readLine() line, err := kclp.ioHandler.readLine()
if err == io.EOF { if err == io.EOF {
kclp.ioHandler.writeError("IO stream closed") kclp.ioHandler.writeError("IO stream closed")
@ -290,31 +263,31 @@ func (kclp *KCLProcess) Run() {
return return
} else if line == "" { } else if line == "" {
kclp.ioHandler.writeError("Empty read line recieved") kclp.ioHandler.writeError("Empty read line recieved")
return continue
} }
action, err := kclp.handleLine(line) err = kclp.handleLine(line)
if err != nil { if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
return return
} }
kclp.ckpmux.Lock()
if !kclp.nextCheckpointPair.IsEmpty() { if !kclp.nextCheckpointPair.IsEmpty() {
err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5) seq := kclp.nextCheckpointPair.Sequence.String()
subSeq := kclp.nextCheckpointPair.SubSequence
err := kclp.sendCheckpoint(&seq, &subSeq)
if err != nil { if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err))
} else {
kclp.nextCheckpointPair = SequencePair{}
} }
kclp.nextCheckpointPair = SequencePair{}
}
err = kclp.reportDone(action)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err))
return
} }
kclp.ckpmux.Unlock()
if kclp.isShuttingDown { if kclp.isShuttingDown {
kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
} }
} }
} }