Added shutdown method to *Checkerpointer struct

This commit is contained in:
Xavi Ramirez 2017-05-22 23:06:56 +00:00
parent 5f507ab116
commit 458e66e321
2 changed files with 5 additions and 1 deletions

View file

@ -59,7 +59,7 @@ func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
func (srp *SampleRecordProcessor) Shutdown(reason string) error {
if reason == "TERMINATE" {
fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n")
srp.checkpoint(nil, nil)
srp.checkpointer.Shutdown()
} else {
fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n")
}

View file

@ -108,6 +108,10 @@ func (c *Checkpointer) CheckpointWithRetry(
return nil
}
func (c *Checkpointer) Shutdown() {
c.CheckpointWithRetry(nil, nil, 5)
}
type ioHandler struct {
inputFile io.Reader
outputFile io.Writer