diff --git a/kcl/kcl.go b/kcl/kcl.go index 5dbc3a7..399a7c2 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -21,22 +21,12 @@ type Checkpointer interface { Shutdown() } -type CheckpointError struct { - e string -} - -func (ce CheckpointError) Error() string { - return ce.e -} - type ioHandler struct { inputFile io.Reader outputFile io.Writer errorFile io.Writer } -//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.) - func (i ioHandler) writeLine(line string) { fmt.Fprintf(i.outputFile, "\n%s\n", line) } @@ -163,39 +153,6 @@ type KCLProcess struct { nextCheckpointPair SequencePair } -func (kclp *KCLProcess) reportDone(responseFor string) error { - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing report done line") - return kclp.ioHandler.writeAction(struct { - Action string `json:"action"` - ResponseFor string `json:"responseFor"` - }{ - Action: "status", - ResponseFor: responseFor, - }) -} - -func (kclp *KCLProcess) performAction(a interface{}) (string, error) { - switch action := a.(type) { - case ActionInitialize: - return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp) - case ActionProcessRecords: - return action.Action, kclp.recordProcessor.ProcessRecords(action.Records) - case ActionShutdown: - return action.Action, kclp.recordProcessor.Shutdown(action.Reason) - default: - return "", fmt.Errorf("unknown action to dispatch: %+#v", action) - } -} - -func (kclp *KCLProcess) handleLine(line string) (string, error) { - action, err := kclp.ioHandler.loadAction(line) - if err != nil { - return "", err - } - - return kclp.performAction(action) -} - func (kclp *KCLProcess) Checkpoint(pair SequencePair) { kclp.ckpmux.Lock() defer kclp.ckpmux.Unlock() @@ -209,78 +166,94 @@ func (kclp *KCLProcess) Shutdown() { kclp.isShuttingDown = true } -func (kclp *KCLProcess) sendCheckpoint(pair SequencePair, retryCount int) error { - sleepDuration := 5 * time.Second +func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { + if action.Error == nil { // Successful checkpoint + return nil + } - for n := 0; n <= retryCount; n++ { - fmt.Printf("Trying to checkpoint %d\n", n) - err := kclp.processCheckpoint(pair) - if err == nil { - return nil - } + msg := *action.Error + switch msg { + case "ShutdownException": + return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") + case "ThrottlingException": + sleep := 5 * time.Second + fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s", sleep) + time.Sleep(sleep) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing") + default: + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", msg) + } - if cperr, ok := err.(CheckpointError); ok { - switch cperr.Error() { - case "ShutdownException": - return fmt.Errorf("Encountered shutdown exception, skipping checkpoint") - case "ThrottlingException": - fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration) - case "InvalidStateException": - fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n") - default: - fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) - } - } else { - return err - } + seq := action.SequenceNumber + subSeq := action.SubSequenceNumber - if n == retryCount { - return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount) - } + kclp.ckpmux.Lock() + if !kclp.nextCheckpointPair.IsEmpty() { + tmp := kclp.nextCheckpointPair.Sequence.String() + seq = &tmp + subSeq = &kclp.nextCheckpointPair.SubSequence + } + kclp.ckpmux.Unlock() - time.Sleep(sleepDuration) + if seq != nil && subSeq != nil { + return kclp.sendCheckpoint(seq, subSeq) } return nil } -func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error { - var seq *string - var subSeq *int - if !pair.IsEmpty() { // an empty pair is a signal to shutdown - tmp := pair.Sequence.String() - seq = &tmp - subSeq = &pair.SubSequence - } - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Writing checkpoint") - kclp.ioHandler.writeAction(ActionCheckpoint{ +func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error { + return kclp.ioHandler.writeAction(ActionCheckpoint{ Action: "checkpoint", SequenceNumber: seq, SubSequenceNumber: subSeq, }) - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading checkpoint line") - line, err := kclp.ioHandler.readLine() +} + +func (kclp *KCLProcess) reportDone(responseFor string) error { + return kclp.ioHandler.writeAction(struct { + Action string `json:"action"` + ResponseFor string `json:"responseFor"` + }{ + Action: "status", + ResponseFor: responseFor, + }) +} + +func (kclp *KCLProcess) handleLine(line string) error { + action, err := kclp.ioHandler.loadAction(line) if err != nil { return err } - actionI, err := kclp.ioHandler.loadAction(line) - if err != nil { - return err + + switch action := action.(type) { + case ActionCheckpoint: + err = kclp.handleCheckpointAction(action) + case ActionInitialize: + err = kclp.recordProcessor.Initialize(action.ShardID, kclp) + if err == nil { + err = kclp.reportDone(action.Action) + } + case ActionProcessRecords: + err = kclp.recordProcessor.ProcessRecords(action.Records) + if err == nil { + err = kclp.reportDone(action.Action) + } + case ActionShutdown: + err = kclp.recordProcessor.Shutdown(action.Reason) + if err == nil { + err = kclp.reportDone(action.Action) + } + default: + err = fmt.Errorf("unknown action to dispatch: %+#v", action) } - action, ok := actionI.(ActionCheckpoint) - if !ok { - return fmt.Errorf("expected checkpoint response, got '%s'", line) - } - if action.Error != nil && *action.Error != "" { - return CheckpointError{e: *action.Error} - } - fmt.Println("Successful checkpoint") - return nil + + return err } func (kclp *KCLProcess) Run() { for { - fmt.Println("%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% Reading process line") line, err := kclp.ioHandler.readLine() if err == io.EOF { kclp.ioHandler.writeError("IO stream closed") @@ -290,31 +263,31 @@ func (kclp *KCLProcess) Run() { return } else if line == "" { kclp.ioHandler.writeError("Empty read line recieved") - return + continue } - action, err := kclp.handleLine(line) + err = kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) return } + kclp.ckpmux.Lock() if !kclp.nextCheckpointPair.IsEmpty() { - err := kclp.sendCheckpoint(kclp.nextCheckpointPair, 5) + seq := kclp.nextCheckpointPair.Sequence.String() + subSeq := kclp.nextCheckpointPair.SubSequence + + err := kclp.sendCheckpoint(&seq, &subSeq) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err)) + } else { + kclp.nextCheckpointPair = SequencePair{} } - kclp.nextCheckpointPair = SequencePair{} - } - - err = kclp.reportDone(action) - if err != nil { - kclp.ioHandler.writeError(fmt.Sprintf("ERR report done: %s, %+#v", action, err)) - return } + kclp.ckpmux.Unlock() if kclp.isShuttingDown { - kclp.sendCheckpoint(SequencePair{}, 5) // Empty SequencePair is signal to shutdown + kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown } } }