From 06838386dbd644b267663ec33e6a8c1e33d81e27 Mon Sep 17 00:00:00 2001 From: Tony Wang Date: Thu, 18 Oct 2018 15:22:57 +0800 Subject: [PATCH] fix review comments --- cmd/consumer/main.go | 9 +++------ kcl/kcl.go | 7 +++---- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index d999e42..329ef55 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -51,16 +51,13 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { return nil } -func (srp *sampleRecordProcessor) ShutdownRequested() error { - fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") - srp.checkpointer.Shutdown() - return nil -} - func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") srp.checkpointer.Shutdown() + } else if reason == "SHUTDOWN_REQUESTED" { + fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index 92b3a26..d82c990 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -12,7 +12,6 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error - ShutdownRequested() error // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -237,12 +236,12 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { case ActionShutdown: kclp.ioHandler.writeError("Received shutdown action...") + reason := action.Reason // Shutdown should block until it's safe to shutdown the process if action.Action == "shutdownRequested" { - err = kclp.recordProcessor.ShutdownRequested() - } else { - err = kclp.recordProcessor.Shutdown(action.Reason) + reason = "SHUTDOWN_REQUESTED" } + err = kclp.recordProcessor.Shutdown(reason) if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err))