From c86a5da722c21220aa7f291dee7364978cf764cd Mon Sep 17 00:00:00 2001 From: Tony Wang Date: Sun, 14 Oct 2018 17:55:46 +0800 Subject: [PATCH 1/3] remove not in use statements --- cmd/consumer/main.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 647b472..e02658a 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -62,11 +62,6 @@ func (srp *sampleRecordProcessor) Shutdown(reason string) error { } func main() { - f, err := os.Create("/tmp/kcl_stderr") - if err != nil { - panic(err) - } - defer f.Close() kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, newSampleRecordProcessor()) kclProcess.Run() } From e7b75a20ac3d7ee1881a9fcf6e75718349e9f9c7 Mon Sep 17 00:00:00 2001 From: Tony Wang Date: Sun, 14 Oct 2018 18:28:58 +0800 Subject: [PATCH 2/3] handle shutdown requested message Message type shutdownRequested doesn't have reason, which causes checkpoint on termination not work. This commit is to ask KCL daemon to checkpoint on shutdownRequested. --- cmd/consumer/main.go | 6 ++++++ kcl/kcl.go | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index e02658a..d999e42 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -51,6 +51,12 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { return nil } +func (srp *sampleRecordProcessor) ShutdownRequested() error { + fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") + srp.checkpointer.Shutdown() + return nil +} + func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") diff --git a/kcl/kcl.go b/kcl/kcl.go index b1f37cd..92b3a26 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -12,6 +12,7 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error + ShutdownRequested() error // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -237,7 +238,12 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { kclp.ioHandler.writeError("Received shutdown action...") // Shutdown should block until it's safe to shutdown the process - err = kclp.recordProcessor.Shutdown(action.Reason) + if action.Action == "shutdownRequested" { + err = kclp.recordProcessor.ShutdownRequested() + } else { + err = kclp.recordProcessor.Shutdown(action.Reason) + } + if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) } From 06838386dbd644b267663ec33e6a8c1e33d81e27 Mon Sep 17 00:00:00 2001 From: Tony Wang Date: Thu, 18 Oct 2018 15:22:57 +0800 Subject: [PATCH 3/3] fix review comments --- cmd/consumer/main.go | 9 +++------ kcl/kcl.go | 7 +++---- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index d999e42..329ef55 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -51,16 +51,13 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { return nil } -func (srp *sampleRecordProcessor) ShutdownRequested() error { - fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") - srp.checkpointer.Shutdown() - return nil -} - func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") srp.checkpointer.Shutdown() + } else if reason == "SHUTDOWN_REQUESTED" { + fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index 92b3a26..d82c990 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -12,7 +12,6 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error - ShutdownRequested() error // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -237,12 +236,12 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") + reason := action.Reason // Shutdown should block until it's safe to shutdown the process if action.Action == "shutdownRequested" { - err = kclp.recordProcessor.ShutdownRequested() - } else { - err = kclp.recordProcessor.Shutdown(action.Reason) + reason = "SHUTDOWN_REQUESTED" } + err = kclp.recordProcessor.Shutdown(reason) if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err))