Add cancellation of pipeline from signal interrupts

This commit is contained in:
Harlow Ward 2017-11-26 16:00:03 -08:00
parent 89570130f5
commit 058f383e30

View file

@ -9,6 +9,8 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal"
"syscall"
consumer "github.com/harlow/kinesis-consumer" consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis" checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
@ -33,17 +35,34 @@ func main() {
) )
flag.Parse() flag.Parse()
var ( // trap SIGINT, wait to trigger shutdown
counter = expvar.NewMap("counters") signals := make(chan os.Signal, 1)
logger = log.New(os.Stdout, "", log.LstdFlags) signal.Notify(signals,
os.Interrupt,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
) )
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-signals
cancel()
}()
// redis checkpoint // redis checkpoint
ck, err := checkpoint.New(*app) ck, err := checkpoint.New(*app)
if err != nil { if err != nil {
log.Fatalf("checkpoint error: %v", err) log.Fatalf("checkpoint error: %v", err)
} }
var (
counter = expvar.NewMap("counters")
logger = log.New(os.Stdout, "", log.LstdFlags)
)
// consumer // consumer
c, err := consumer.New( c, err := consumer.New(
*stream, *stream,
@ -56,7 +75,7 @@ func main() {
} }
// start scan // start scan
err = c.Scan(context.TODO(), func(r *consumer.Record) bool { err = c.Scan(ctx, func(r *consumer.Record) bool {
fmt.Println(string(r.Data)) fmt.Println(string(r.Data))
return true // continue scanning return true // continue scanning
}) })