diff --git a/consumer.go b/consumer.go index 53b9cdf..8137f1f 100644 --- a/consumer.go +++ b/consumer.go @@ -11,6 +11,9 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" ) // Record is an alias of record returned from kinesis library @@ -104,13 +107,20 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error ctx, cancel := context.WithCancel(ctx) defer cancel() + span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scan") + defer span.Finish() + // get shard ids - shardIDs, err := c.getShardIDs(c.streamName) + shardIDs, err := c.getShardIDs(ctx, c.streamName) if err != nil { - return fmt.Errorf("get shards error: %v", err) + span.LogKV("get shardID error", err.Error(), "stream.name", c.streamName) + ext.Error.Set(span, true) + return fmt.Errorf("get shards error: %s", err.Error()) } if len(shardIDs) == 0 { + span.LogKV("get shardID error", err.Error(), "stream.name", c.streamName, "shards.count", len(shardIDs)) + ext.Error.Set(span, true) return fmt.Errorf("no shards available") } @@ -126,6 +136,8 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error defer wg.Done() if err := c.ScanShard(ctx, shardID, fn); err != nil { + span.LogKV("scan shard error", err.Error(), "shardID", shardID) + ext.Error.Set(span, true) select { case errc <- fmt.Errorf("shard %s error: %v", shardID, err): // first error to occur @@ -151,15 +163,21 @@ func (c *Consumer) ScanShard( shardID string, fn func(*Record) ScanStatus, ) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scanshard") + defer span.Finish() // get checkpoint lastSeqNum, err := c.checkpoint.Get(c.streamName, shardID) if err != nil { + span.LogKV("checkpoint error", err.Error(), "shardID", shardID) + ext.Error.Set(span, true) return fmt.Errorf("get checkpoint error: %v", err) } // get shard iterator - shardIterator, err := c.getShardIterator(c.streamName, shardID, lastSeqNum) + shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum) if err != nil { + span.LogKV("get shard error", err.Error(), "shardID", shardID, "lastSeqNumber", lastSeqNum) + ext.Error.Set(span, true) return fmt.Errorf("get shard iterator error: %v", err) } @@ -169,18 +187,22 @@ func (c *Consumer) ScanShard( } func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(*Record) ScanStatus) error { + span := opentracing.SpanFromContext(ctx) for { select { case <-ctx.Done(): + span.SetTag("scan", "done") return nil default: + span.SetTag("scan", "on") resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{ ShardIterator: shardIterator, }) if err != nil { - shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum) + shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum) if err != nil { + ext.Error.Set(span, true) return fmt.Errorf("get shard iterator error: %v", err) } continue @@ -188,17 +210,22 @@ func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum str // loop records of page for _, r := range resp.Records { - isScanStopped, err := c.handleRecord(shardID, r, fn) + ctx = opentracing.ContextWithSpan(ctx, span) + isScanStopped, err := c.handleRecord(ctx, shardID, r, fn) if err != nil { + span.LogKV("handle record error", err.Error(), "shardID", shardID) + ext.Error.Set(span, true) return err } if isScanStopped { + span.SetTag("scan", "stopped") return nil } lastSeqNum = *r.SequenceNumber } if isShardClosed(resp.NextShardIterator, shardIterator) { + span.LogKV("is shard closed", "true") return nil } shardIterator = resp.NextShardIterator @@ -210,34 +237,46 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool { return nextShardIterator == nil || currentShardIterator == nextShardIterator } -func (c *Consumer) handleRecord(shardID string, r *Record, fn func(*Record) ScanStatus) (isScanStopped bool, err error) { +func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, fn func(*Record) ScanStatus) (isScanStopped bool, err error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.handleRecord") + defer span.Finish() status := fn(r) - if !status.SkipCheckpoint { + span.LogKV("scan.state", status) if err := c.checkpoint.Set(c.streamName, shardID, *r.SequenceNumber); err != nil { + span.LogKV("checkpoint error", err.Error(), "stream.name", c.streamName, "shardID", shardID, "sequenceNumber", *r.SequenceNumber) + ext.Error.Set(span, true) return false, err } } if err := status.Error; err != nil { + span.LogKV("scan.state", status.Error) + ext.Error.Set(span, true) return false, err } c.counter.Add("records", 1) if status.StopScan { + span.LogKV("scan.state", "stopped") return true, nil } return false, nil } -func (c *Consumer) getShardIDs(streamName string) ([]string, error) { +func (c *Consumer) getShardIDs(ctx context.Context, streamName string) ([]string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIDs") + defer span.Finish() + resp, err := c.client.DescribeStream( &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), }, ) if err != nil { + span.LogKV("describe stream error", err.Error()) + ext.Error.Set(span, true) return nil, fmt.Errorf("describe stream error: %v", err) } @@ -248,7 +287,11 @@ func (c *Consumer) getShardIDs(streamName string) ([]string, error) { return ss, nil } -func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*string, error) { +func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, lastSeqNum string) (*string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIterator", + opentracing.Tag{Key: "lastSeqNum", Value: "lastSeqNum"}) + defer span.Finish() + params := &kinesis.GetShardIteratorInput{ ShardId: aws.String(shardID), StreamName: aws.String(streamName), @@ -263,6 +306,8 @@ func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*st resp, err := c.client.GetShardIterator(params) if err != nil { + span.LogKV("get shard error", err.Error()) + ext.Error.Set(span, true) return nil, err } return resp.ShardIterator, nil diff --git a/examples/distributed-tracing/README.md b/examples/distributed-tracing/README.md new file mode 100644 index 0000000..896b21b --- /dev/null +++ b/examples/distributed-tracing/README.md @@ -0,0 +1,68 @@ +# Examples with opentracing + +The examples are roughly the same as thoe without tracing, but to demonstrate what the code will look liek with distributed tracing integrated. The tracing api spec that we are using is Opentracing, due to a wider and more stable support at the moment. + +Please refer to README under examples/consumer and examples/producer. + +## Installation +### Setup data for producer to upload + + $ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt + $ go run main.go --stream streamName + +### Setup AWS + +For consumer and producer: + + * export the required environment vars for connecting to the AWS resources: + +``` +export AWS_ACCESS_KEY= +export AWS_REGION= +export AWS_SECRET_KEY= +``` + + * export the Jaeger Environment to connect to Jaeger agent: +Reference (https://www.jaegertracing.io) for various variables settings. + + ``` +export JAEGER_SAMPLER_TYPE=const +export JAEGER_SAMPLER_PARAM=1 +export JAEGER_AGENT_HOST=localhost +export JAEGER_AGENT_PORT=6831 + ``` + +### Setup Backend + +Opencensus supports both tracing and stat backend. For demo purposes, we are going to use Jaeger as the tracing backend and Prometheus for stats. + +### Tracing Backend +Please refer to docs in reference section for Jaeger. +Setup Jaeger Agent using the all-in-one docker image +``` +$ docker run -d --name jaeger \ + -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ + -p 5775:5775/udp \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 14268:14268 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.6 + +``` +You should be able to access the UI via http://localhost:16686. + +## Development +You need opentracing-go as development depenency. If you want to see the result on UI, you need to choose an appropriate vendor (https://opentracing.io/) + +``` +go get -u github.com/opentracing/opentracing-go + +``` + +References: +> Opentracing (https://github.com/opentracing/opentracing-go)  ·  +> Jaeger (https://www.jaegertracing.io/docs/1.6/getting-started/)  ·  +> Prometheus (https://prometheus.io/docs/prometheus/latest/installation/)  ·  diff --git a/examples/distributed-tracing/consumer/consumer.go b/examples/distributed-tracing/consumer/consumer.go new file mode 100644 index 0000000..03c7cbe --- /dev/null +++ b/examples/distributed-tracing/consumer/consumer.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "expvar" + "flag" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + + alog "github.com/apex/log" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/kinesis" + consumer "github.com/harlow/kinesis-consumer" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + + checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" + "github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility" +) + +const serviceName = "consumer" + +// kick off a server for exposing scan metrics +func init() { + sock, err := net.Listen("tcp", "localhost:8080") + if err != nil { + log.Printf("net listen error: %v", err) + } + go func() { + fmt.Println("Metrics available at http://localhost:8080/debug/vars") + http.Serve(sock, nil) + }() +} + +func main() { + log := utility.NewLogger(serviceName, alog.DebugLevel) + tracer, closer := utility.NewTracer(serviceName) + defer closer.Close() + opentracing.InitGlobalTracer(tracer) + + span := tracer.StartSpan("consumer.main") + defer span.Finish() + + var ( + app = flag.String("app", "", "App name") + stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Checkpoint table name") + ) + flag.Parse() + + span.SetTag("app.name", app) + span.SetTag("stream.name", stream) + span.SetTag("table.name", table) + + // Following will overwrite the default dynamodb client + // Older versions of aws sdk does not picking up aws config properly. + // You probably need to update aws sdk verison. Tested the following with 1.13.59 + cfg := aws.NewConfig().WithRegion("us-west-2") + sess := session.New(cfg) + sess = utility.WrapSession(sess) + myDynamoDbClient := dynamodb.New(sess) + + // ddb checkpoint + retryer := utility.NewRetryer() + ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(retryer)) + if err != nil { + span.LogKV("checkpoint error", err.Error()) + span.SetTag("consumer.retry.count", retryer.Count()) + ext.Error.Set(span, true) + // Need to end span here, since Fatalf calls os.Exit + log.Log("checkpoint error", "error", err.Error()) + } + + var counter = expvar.NewMap("counters") + + // The following 2 lines will overwrite the default kinesis client + ksis := kinesis.New(sess) + + // consumer + c, err := consumer.New( + *stream, + consumer.WithCheckpoint(ck), + consumer.WithLogger(log), + consumer.WithCounter(counter), + consumer.WithClient(ksis), + ) + if err != nil { + span.LogKV("consumer initialization error", err.Error()) + ext.Error.Set(span, true) + log.Log("consumer initialization error", "error", err.Error()) + } + + // use cancel func to signal shutdown + ctx, cancel := context.WithCancel(context.Background()) + + // trap SIGINT, wait to trigger shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + go func() { + <-signals + cancel() + }() + + // scan stream + err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus { + fmt.Println(string(r.Data)) + + // continue scanning + return consumer.ScanStatus{} + }) + if err != nil { + span.LogKV("consumer scan error", err.Error()) + ext.Error.Set(span, true) + log.Log("consumer scan error", "error", err.Error()) + } + + if err := ck.Shutdown(); err != nil { + span.LogKV("consumer shutdown error", err.Error()) + ext.Error.Set(span, true) + log.Log("checkpoint shutdown error", "error", err.Error()) + } +} diff --git a/examples/distributed-tracing/producer/producer.go b/examples/distributed-tracing/producer/producer.go new file mode 100644 index 0000000..4346b7d --- /dev/null +++ b/examples/distributed-tracing/producer/producer.go @@ -0,0 +1,107 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "log" + "os" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + + "github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility" +) + +const serviceName = "producer" +const dataFile = "./users.txt" + +var svc kinesisiface.KinesisAPI + +func main() { + tracer, closer := utility.NewTracer(serviceName) + // Jaeger tracer implements Close not opentracing + defer closer.Close() + opentracing.InitGlobalTracer(tracer) + + cfg := aws.NewConfig().WithRegion("us-west-2") + sess := session.New(cfg) + sess = utility.WrapSession(sess) + svc = kinesis.New(sess) + + ctx, _ := context.WithCancel(context.Background()) + + span := tracer.StartSpan("producer.main") + defer span.Finish() + + var streamName = flag.String("stream", "", "Stream name") + flag.Parse() + + span.SetTag("producer.stream.name", streamName) + + // download file with test data + // curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt + f, err := os.Open(dataFile) + if err != nil { + + span.LogKV("file open error", err.Error()) + ext.Error.Set(span, true) + // Need to end span here, since Fatalf calls os.Exit + span.Finish() + closer.Close() + log.Fatal(fmt.Sprintf("Cannot open %s file"), dataFile) + } + defer f.Close() + span.SetTag("producer.file.name", f.Name()) + + // Wrap the span with meta into context and flow that + // to another component. + ctx = opentracing.ContextWithSpan(ctx, span) + + var records []*kinesis.PutRecordsRequestEntry + + // loop over file data + b := bufio.NewScanner(f) + for b.Scan() { + records = append(records, &kinesis.PutRecordsRequestEntry{ + Data: b.Bytes(), + PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)), + }) + + if len(records) > 250 { + putRecords(ctx, streamName, records) + records = nil + } + } + + if len(records) > 0 { + putRecords(ctx, streamName, records) + } +} + +func putRecords(ctx context.Context, streamName *string, records []*kinesis.PutRecordsRequestEntry) { + // I am assuming each new AWS call is a new Span + span, _ := opentracing.StartSpanFromContext(ctx, "producer.putRecords") + defer span.Finish() + span.SetTag("producer.records.count", len(records)) + _, err := svc.PutRecords(&kinesis.PutRecordsInput{ + StreamName: streamName, + Records: records, + }) + if err != nil { + // Log the error details and set the Span as failee + span.LogKV("put records error", err.Error()) + ext.Error.Set(span, true) + // Need to end span here, since Fatalf calls os.Exit + span.Finish() + log.Fatalf("error putting records: %v", err) + } + fmt.Print(".") +} diff --git a/examples/distributed-tracing/utility/aws.go b/examples/distributed-tracing/utility/aws.go new file mode 100644 index 0000000..9c74141 --- /dev/null +++ b/examples/distributed-tracing/utility/aws.go @@ -0,0 +1,95 @@ +package utility + +import ( + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// When you initiate any resource client and pass in a AWS session, it does a few things: +// * session carries the configuration to make and sign the request header +// * session embodies a set of default request handlers to be execute in order +// * AWS Client calls a list of request handlers before sending out a raw http request. +// +// For set of request handlers see: https://github.com/aws/aws-sdk-go/blob/master/aws/request/handlers.go +// For starting and ending a span, we are going to insert 1 handler in front and 1 at the end. +// Span annotation will be done as see fit inside the handler. +type handlers struct{} + +// WrapSession wraps a session.Session, causing requests and responses to be traced. +func WrapSession(s *session.Session) *session.Session { + // clone the session to avoid any sharing issue. + s = s.Copy() + h := &handlers{} + // set our handlers for starting and ending a span. + s.Handlers.Send.PushFrontNamed(request.NamedHandler{ + Name: "tracing.Send", + Fn: h.Send, + }) + s.Handlers.Complete.PushBackNamed(request.NamedHandler{ + Name: "tracing.Complete", + Fn: h.Complete, + }) + return s +} + +// Send creates a new span and be a dependent span if there is a parent span in the context, +// otherwise a new root span. Annotate the span with metadata. Then wrap the span inside the context +// before sending downstream. +func (h *handlers) Send(req *request.Request) { + // We are setting the span name and mark that this span is initiating from a client. + span, ctx := opentracing.StartSpanFromContext(req.Context(), h.operationName(req)) + ext.SpanKindRPCClient.Set(span) + span = span.SetTag("aws.serviceName", h.serviceName(req)) + span = span.SetTag("aws.resource", h.resourceName(req)) + span = span.SetTag("aws.agent", h.awsAgent(req)) + span = span.SetTag("aws.operation", req.Operation.Name) + span = span.SetTag("aws.region", req.ClientInfo.SigningRegion) + span = span.SetTag("aws.requestID", req.RequestID) + ext.HTTPMethod.Set(span, req.Operation.HTTPMethod) + ext.HTTPUrl.Set(span, req.HTTPRequest.URL.String()) + + req.SetContext(ctx) +} + +func (h *handlers) Complete(req *request.Request) { + span := opentracing.SpanFromContext(req.Context()) + defer span.Finish() + defer FailIfError(span, req.Error) + if req.HTTPResponse != nil { + ext.HTTPStatusCode.Set(span, uint16(req.HTTPResponse.StatusCode)) + } +} + +func (h *handlers) operationName(req *request.Request) string { + return h.awsService(req) + ".command" +} + +func (h *handlers) resourceName(req *request.Request) string { + return h.awsService(req) + "." + req.Operation.Name +} + +func (h *handlers) serviceName(req *request.Request) string { + return "aws." + h.awsService(req) +} + +func (h *handlers) awsAgent(req *request.Request) string { + agent := req.HTTPRequest.Header.Get("User-Agent") + if agent != "" { + return agent + } + return "aws-sdk-go" +} + +func (h *handlers) awsService(req *request.Request) string { + return req.ClientInfo.ServiceName +} + +func FailIfError(span opentracing.Span, err error) { + if err != nil { + ext.Error.Set(span, true) + span.LogKV("aws request error", err.Error()) + } +} diff --git a/examples/distributed-tracing/utility/aws_retryer.go b/examples/distributed-tracing/utility/aws_retryer.go new file mode 100644 index 0000000..fc69358 --- /dev/null +++ b/examples/distributed-tracing/utility/aws_retryer.go @@ -0,0 +1,53 @@ +package utility + +import ( + "math/rand" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/dynamodb" + + checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb" +) + +const defaultSleepInterval = 30 * time.Second + +// Retryer used for checkpointing +type Retryer struct { + checkpoint.Retryer + count uint64 +} + +func NewRetryer() *Retryer { + return &Retryer{count: 0} +} + +// ShouldRetry implements custom logic for when a checkpont should retry +func (r *Retryer) ShouldRetry(err error) bool { + if awsErr, ok := err.(awserr.Error); ok { + switch awsErr.Code() { + case dynamodb.ErrCodeProvisionedThroughputExceededException, dynamodb.ErrCodeLimitExceededException: + jitter := rand.New(rand.NewSource(0)) + atomic.AddUint64(&r.count, 1) + // You can have more sophisticated sleep mechanism + time.Sleep(30 * time.Second) + randomSleep(defaultSleepInterval, jitter) + return true + default: + return false + } + } + return false +} + +func (r Retryer) Count() uint { + return uint(r.count) +} + +func randomSleep(d time.Duration, r *rand.Rand) time.Duration { + if d == 0 { + return 0 + } + return d + time.Duration(r.Int63n(2*int64(d))) +} diff --git a/examples/distributed-tracing/utility/logger.go b/examples/distributed-tracing/utility/logger.go new file mode 100644 index 0000000..4f373ba --- /dev/null +++ b/examples/distributed-tracing/utility/logger.go @@ -0,0 +1,33 @@ +package utility + +import ( + "os" + + alog "github.com/apex/log" + "github.com/apex/log/handlers/text" +) + +// A logger provides a minimalistic logger satisfying the Logger interface. +type logger struct { + serviceName string + logger alog.Logger +} + +func NewLogger(serviceName string, level alog.Level) *logger { + return &logger{ + serviceName: serviceName, + logger: alog.Logger{ + Handler: text.New(os.Stdout), + Level: level, + }, + } +} + +// Log logs the parameters to the stdlib logger. See log.Println. +func (l *logger) Log(args ...interface{}) { + l.logger.Infof(l.serviceName, args...) +} + +func (l *logger) Fatalf(args ...interface{}) { + l.logger.Fatalf(l.serviceName, args...) +} diff --git a/examples/distributed-tracing/utility/tracer.go b/examples/distributed-tracing/utility/tracer.go new file mode 100644 index 0000000..28714f6 --- /dev/null +++ b/examples/distributed-tracing/utility/tracer.go @@ -0,0 +1,33 @@ +package utility + +import ( + "io" + "log" + + "github.com/opentracing/opentracing-go" + jaegerConfig "github.com/uber/jaeger-client-go/config" +) + +// A Noop opentracing tracer +var globalTracer = &opentracing.NoopTracer{} + +// A Noop io.Closer +type nullCloser struct{} + +func (*nullCloser) Close() error { return nil } + +func NewTracer(serviceName string) (opentracing.Tracer, io.Closer) { + config, err := jaegerConfig.FromEnv() + if err != nil { + log.Printf("error loading tracer config: %s", err.Error()) + return globalTracer, &nullCloser{} + } + if len(serviceName) > 0 { + config.ServiceName = serviceName + } + tracer, closer, err := config.New(serviceName) + if err != nil { + panic("cannot init jaeger") + } + return tracer, closer +}