diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 4fd7a7e..72ac324 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -55,6 +55,9 @@ func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") srp.checkpointer.Shutdown() + } else if reason == "SHUTDOWN_REQUESTED" { + fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } @@ -62,11 +65,6 @@ func (srp *sampleRecordProcessor) Shutdown(reason string) error { } func main() { - f, err := os.Create("/tmp/kcl_stderr") - if err != nil { - panic(err) - } - defer f.Close() kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, newSampleRecordProcessor()) kclProcess.Run() } diff --git a/kcl/kcl.go b/kcl/kcl.go index 5949caf..8e847b0 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -238,8 +238,13 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") + reason := action.Reason // Shutdown should block until it's safe to shutdown the process - err = kclp.recordProcessor.Shutdown(action.Reason) + if action.Action == "shutdownRequested" { + reason = "SHUTDOWN_REQUESTED" + } + err = kclp.recordProcessor.Shutdown(reason) + if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) }