From b07ad4c1c22e5cee9b4b18c8f767b7431cb01e2a Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Wed, 20 Sep 2017 21:28:43 +0000 Subject: [PATCH] Cleaned up shutdown code --- kcl/kcl.go | 54 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index b26098c..b1f37cd 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -141,7 +141,7 @@ func New( recordProcessor: recordProcessor, nextCheckpointPair: SequencePair{}, - wasAskedToShutdown: false, + doGracefulShutdown: false, } } @@ -152,7 +152,7 @@ type KCLProcess struct { recordProcessor RecordProcessor nextCheckpointPair SequencePair - wasAskedToShutdown bool + doGracefulShutdown bool } func (kclp *KCLProcess) Checkpoint(pair SequencePair) { @@ -166,7 +166,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { func (kclp *KCLProcess) Shutdown() { kclp.ioHandler.writeError("Checkpoint shutdown") - kclp.wasAskedToShutdown = true + kclp.doGracefulShutdown = true } func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { @@ -224,15 +224,15 @@ func (kclp *KCLProcess) reportDone(responseFor string) error { }) } -func (kclp *KCLProcess) handleLine(line string) error { +func (kclp *KCLProcess) handleLine(line string) (string, error) { action, err := kclp.ioHandler.loadAction(line) if err != nil { - return err + return "", err } switch action := action.(type) { case ActionCheckpoint: - err = kclp.handleCheckpointAction(action) + return "checkpoint", kclp.handleCheckpointAction(action) case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") @@ -244,27 +244,31 @@ func (kclp *KCLProcess) handleLine(line string) error { kclp.ioHandler.writeError("Reporting shutdown done") - err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown - if err != nil { - kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) + if kclp.doGracefulShutdown { + err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) + } } - return kclp.reportDone("shutdown") + return "shutdown", kclp.reportDone("shutdown") case ActionInitialize: - err = kclp.recordProcessor.Initialize(action.ShardID, kclp) - if err == nil { - err = kclp.reportDone(action.Action) + err := kclp.recordProcessor.Initialize(action.ShardID, kclp) + if err != nil { + return "", err } - case ActionProcessRecords: - err = kclp.recordProcessor.ProcessRecords(action.Records) - if err == nil { - err = kclp.reportDone(action.Action) - } - default: - err = fmt.Errorf("unknown action to dispatch: %+#v", action) - } - return err + return "initialize", kclp.reportDone(action.Action) + case ActionProcessRecords: + err := kclp.recordProcessor.ProcessRecords(action.Records) + if err != nil { + return "", err + } + + return "process-record", kclp.reportDone(action.Action) + default: + return "", fmt.Errorf("unknown action to dispatch: %+#v", action) + } } func (kclp *KCLProcess) Run() { @@ -281,9 +285,11 @@ func (kclp *KCLProcess) Run() { continue } - err = kclp.handleLine(line) + action, err := kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) + } else if action == "shutdown" { + return } kclp.ckpmux.Lock() @@ -297,7 +303,7 @@ func (kclp *KCLProcess) Run() { } else { kclp.nextCheckpointPair = SequencePair{} } - } else if kclp.wasAskedToShutdown { + } else if kclp.doGracefulShutdown { err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))