kinesis-consumer/examples/consumer/main.go
Prometheus 9e0a97916d Use AWS resource iface, overwrite default dynamodb, more explicit in example about overwrite default AWS resrouce client (#49)
* use custom kinesis client

* use aws sdk interface, add missing api for ddb

* add overwrite default dynamodbclien usage
2018-05-31 17:41:14 -07:00

97 lines
2.2 KiB
Go

package main
import (
"context"
"expvar"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis"
consumer "github.com/harlow/kinesis-consumer"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
)
// kick off a server for exposing scan metrics
func init() {
sock, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("net listen error: %v", err)
}
go func() {
fmt.Println("Metrics available at http://localhost:8080/debug/vars")
http.Serve(sock, nil)
}()
}
func main() {
var (
app = flag.String("app", "", "App name")
stream = flag.String("stream", "", "Stream name")
table = flag.String("table", "", "Checkpoint table name")
)
flag.Parse()
// Following will overwrite the default dynamodb client
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))
// ddb checkpoint
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
log.Fatalf("checkpoint error: %v", err)
}
var (
counter = expvar.NewMap("counters")
logger = log.New(os.Stdout, "", log.LstdFlags)
)
// The following 2 lines will overwrite the default kinesis client
myKinesisClient := kinesis.New(session.New(aws.NewConfig()))
newKclient := consumer.NewKinesisClient(consumer.WithKinesis(myKinesisClient))
// consumer
c, err := consumer.New(
*stream,
consumer.WithCheckpoint(ck),
consumer.WithLogger(logger),
consumer.WithCounter(counter),
consumer.WithClient(newKclient),
)
if err != nil {
log.Fatalf("consumer error: %v", err)
}
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
// scan stream
err = c.Scan(ctx, func(r *consumer.Record) bool {
fmt.Println(string(r.Data))
return true // continue scanning
})
if err != nil {
log.Fatalf("scan error: %v", err)
}
if err := ck.Shutdown(); err != nil {
log.Fatalf("checkpoint shutdown error: %v", err)
}
}