From 458e66e3214ccfc27f0c21667fceef81da6a8af6 Mon Sep 17 00:00:00 2001 From: Xavi Ramirez Date: Mon, 22 May 2017 23:06:56 +0000 Subject: [PATCH] Added shutdown method to *Checkerpointer struct --- cmd/consumer/main.go | 2 +- kcl/kcl.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 2d78381..22f8ccc 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -59,7 +59,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error { func (srp *SampleRecordProcessor) Shutdown(reason string) error { if reason == "TERMINATE" { fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") - srp.checkpoint(nil, nil) + srp.checkpointer.Shutdown() } else { fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") } diff --git a/kcl/kcl.go b/kcl/kcl.go index 92abb3f..1ccd333 100644 --- a/kcl/kcl.go +++ b/kcl/kcl.go @@ -108,6 +108,10 @@ func (c *Checkpointer) CheckpointWithRetry( return nil } +func (c *Checkpointer) Shutdown() { + c.CheckpointWithRetry(nil, nil, 5) +} + type ioHandler struct { inputFile io.Reader outputFile io.Writer