diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 45dc5fe..b690730 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -9,6 +9,8 @@ import ( "net" "net/http" "os" + "os/signal" + "syscall" consumer "github.com/harlow/kinesis-consumer" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" @@ -33,17 +35,34 @@ func main() { ) flag.Parse() - var ( - counter = expvar.NewMap("counters") - logger = log.New(os.Stdout, "", log.LstdFlags) + // trap SIGINT, wait to trigger shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, + os.Interrupt, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT, ) + // use cancel func to signal shutdown + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-signals + cancel() + }() + // redis checkpoint ck, err := checkpoint.New(*app) if err != nil { log.Fatalf("checkpoint error: %v", err) } + var ( + counter = expvar.NewMap("counters") + logger = log.New(os.Stdout, "", log.LstdFlags) + ) + // consumer c, err := consumer.New( *stream, @@ -56,7 +75,7 @@ func main() { } // start scan - err = c.Scan(context.TODO(), func(r *consumer.Record) bool { + err = c.Scan(ctx, func(r *consumer.Record) bool { fmt.Println(string(r.Data)) return true // continue scanning })