From b385278e04d308f715a36ea389e4edde1fd3556c Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Fri, 18 Aug 2017 01:42:36 +0000 Subject: [PATCH] More properly handle errors from KCL --- kcl/kcl.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kcl/kcl.go b/kcl/kcl.go index 3270be1..01d0147 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -140,6 +140,7 @@ func New( recordProcessor: recordProcessor, nextCheckpointPair: SequencePair{}, + wasAskedToShutdown: false, } } @@ -150,6 +151,7 @@ type KCLProcess struct { recordProcessor RecordProcessor nextCheckpointPair SequencePair + wasAskedToShutdown bool } func (kclp *KCLProcess) Checkpoint(pair SequencePair) { @@ -163,7 +165,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) { func (kclp *KCLProcess) Shutdown() { kclp.ioHandler.writeError("Checkpoint shutdown") - kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown + kclp.wasAskedToShutdown = true } func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { @@ -275,7 +277,6 @@ func (kclp *KCLProcess) Run() { err = kclp.handleLine(line) if err != nil { kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) - return } kclp.ckpmux.Lock() @@ -289,6 +290,11 @@ func (kclp *KCLProcess) Run() { } else { kclp.nextCheckpointPair = SequencePair{} } + } else if kclp.wasAskedToShutdown { + err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) + } } kclp.ckpmux.Unlock() }