diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 2d78381..22f8ccc 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -59,7 +59,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { func (srp *SampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") - srp.checkpoint(nil, nil) + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index 92abb3f..1ccd333 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -108,6 +108,10 @@ func (c *Checkpointer) CheckpointWithRetry( return nil } +func (c *Checkpointer) Shutdown() { + c.CheckpointWithRetry(nil, nil, 5) +} + type ioHandler struct { inputFile io.Reader outputFile io.Writer