kinesis-consumer/examples/distributed-tracing/consumer/consumer.go

130 lines
3.4 KiB
Go
Raw Normal View History

package main
import (
"context"
"expvar"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
alog "github.com/apex/log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/kinesis"
consumer "github.com/harlow/kinesis-consumer"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"
"github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility"
)
const serviceName = "consumer"
// kick off a server for exposing scan metrics
func init() {
sock, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Printf("net listen error: %v", err)
}
go func() {
fmt.Println("Metrics available at http://localhost:8080/debug/vars")
http.Serve(sock, nil)
}()
}
func main() {
log := utility.NewLogger(serviceName, alog.DebugLevel)
tracer, closer := utility.NewTracer(serviceName)
defer closer.Close()
opentracing.InitGlobalTracer(tracer)
span := tracer.StartSpan("consumer.main")
defer span.Finish()
var (
app = flag.String("app", "", "App name")
stream = flag.String("stream", "", "Stream name")
table = flag.String("table", "", "Checkpoint table name")
)
flag.Parse()
span.SetTag("app.name", app)
span.SetTag("stream.name", stream)
span.SetTag("table.name", table)
// Following will overwrite the default dynamodb client
// Older versions of aws sdk does not picking up aws config properly.
// You probably need to update aws sdk verison. Tested the following with 1.13.59
cfg := aws.NewConfig().WithRegion("us-west-2")
sess := session.New(cfg)
sess = utility.WrapSession(sess)
myDynamoDbClient := dynamodb.New(sess)
// ddb checkpoint
retryer := utility.NewRetryer()
ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(retryer))
if err != nil {
span.LogKV("checkpoint error", err.Error())
span.SetTag("consumer.retry.count", retryer.Count())
ext.Error.Set(span, true)
// Need to end span here, since Fatalf calls os.Exit
log.Log("checkpoint error", "error", err.Error())
}
var counter = expvar.NewMap("counters")
// The following 2 lines will overwrite the default kinesis client
ksis := kinesis.New(sess)
// consumer
c, err := consumer.New(
*stream,
consumer.WithCheckpoint(ck),
consumer.WithLogger(log),
consumer.WithCounter(counter),
consumer.WithClient(ksis),
)
if err != nil {
span.LogKV("consumer initialization error", err.Error())
ext.Error.Set(span, true)
log.Log("consumer initialization error", "error", err.Error())
}
// use cancel func to signal shutdown
ctx, cancel := context.WithCancel(context.Background())
// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
// scan stream
err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus {
fmt.Println(string(r.Data))
// continue scanning
return consumer.ScanStatus{}
})
if err != nil {
span.LogKV("consumer scan error", err.Error())
ext.Error.Set(span, true)
log.Log("consumer scan error", "error", err.Error())
}
if err := ck.Shutdown(); err != nil {
span.LogKV("consumer shutdown error", err.Error())
ext.Error.Set(span, true)
log.Log("checkpoint shutdown error", "error", err.Error())
}
}