From 058f383e3062f1f29337814fc6ca1a239090d8df Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 26 Nov 2017 16:00:03 -0800 Subject: [PATCH] Add cancellation of pipeline from signal interrupts --- examples/consumer/main.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 45dc5fe..b690730 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -9,6 +9,8 @@ import ( "net" "net/http" "os" + "os/signal" + "syscall" consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" @@ -33,17 +35,34 @@ func main() { ) flag.Parse() - var ( - counter = expvar.NewMap("counters") - logger = log.New(os.Stdout, "", log.LstdFlags) + // trap SIGINT, wait to trigger shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, + os.Interrupt, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT, ) + // use cancel func to signal shutdown + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-signals + cancel() + }() + // redis checkpoint ck, err := checkpoint.New(*app) if err != nil { log.Fatalf("checkpoint error: %v", err) } + var ( + counter = expvar.NewMap("counters") + logger = log.New(os.Stdout, "", log.LstdFlags) + ) + // consumer c, err := consumer.New( *stream, @@ -56,7 +75,7 @@ func main() { } // start scan - err = c.Scan(context.TODO(), func(r *consumer.Record) bool { + err = c.Scan(ctx, func(r *consumer.Record) bool { fmt.Println(string(r.Data)) return true // continue scanning })