kinesis-consumer/examples/consumer/cp-redis/main.go
Harlow Ward fb98fbe244
Remove the client wrapper (#58)
Having an additional Client has added some confusion (https://github.com/harlow/kinesis-consumer/issues/45) on how to provide a
custom kinesis client. Allowing `WithClient` to accept a Kinesis client
it cleans up the interface.

Major changes:

* Remove the Client wrapper; prefer using kinesis client directly
* Change `ScanError` to `ScanStatus` as the return value isn't necessarily an error

Note: these are breaking changes, if you need last stable release please see here: https://github.com/harlow/kinesis-consumer/releases/tag/v0.2.0
2018-07-28 22:53:33 -07:00

58 lines
1 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"
)
func main() {
var (
app = flag.String("app", "", "App name")
stream = flag.String("stream", "", "Stream name")
)
flag.Parse()
// redis checkpoint
ck, err := checkpoint.New(*app)
if err != nil {
log.Fatalf("checkpoint error: %v", err)
}
// consumer
c, err := consumer.New(
*stream, consumer.WithCheckpoint(ck),
)
if err != nil {
log.Fatalf("consumer error: %v", err)
}
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
// scan stream
err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus {
fmt.Println(string(r.Data))
// continue scanning
return consumer.ScanStatus{}
})
if err != nil {
log.Fatalf("scan error: %v", err)
}
}