diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index e02658a..d999e42 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -51,6 +51,12 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { return nil } +func (srp *sampleRecordProcessor) ShutdownRequested() error { + fmt.Fprintf(os.Stderr, "Got shutdown requested, attempt to checkpoint.\n") + srp.checkpointer.Shutdown() + return nil +} + func (srp *sampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") diff --git a/kcl/kcl.go b/kcl/kcl.go index b1f37cd..92b3a26 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -12,6 +12,7 @@ import ( type RecordProcessor interface { Initialize(shardID string, checkpointer Checkpointer) error ProcessRecords(records []Record) error + ShutdownRequested() error // Shutdown this call should block until it's safe to shutdown the process Shutdown(reason string) error } @@ -237,7 +238,12 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) { kclp.ioHandler.writeError("Received shutdown action...") // Shutdown should block until it's safe to shutdown the process - err = kclp.recordProcessor.Shutdown(action.Reason) + if action.Action == "shutdownRequested" { + err = kclp.recordProcessor.ShutdownRequested() + } else { + err = kclp.recordProcessor.Shutdown(action.Reason) + } + if err != nil { // Log error and continue shutting down kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err)) }