diff --git a/kcl/kcl.go b/kcl/kcl.go index b1d2d2e..b26098c 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -243,6 +243,12 @@ func (kclp *KCLProcess) handleLine(line string) error { } kclp.ioHandler.writeError("Reporting shutdown done") + + err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown + if err != nil { + kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) + } + return kclp.reportDone("shutdown") case ActionInitialize: err = kclp.recordProcessor.Initialize(action.ShardID, kclp)