diff --git a/checkpoint/ddb/ddb.go b/checkpoint/ddb/ddb.go index e4ff6a4..24ead66 100644 --- a/checkpoint/ddb/ddb.go +++ b/checkpoint/ddb/ddb.go @@ -93,6 +93,8 @@ type item struct { func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) { namespace := fmt.Sprintf("%s-%s", c.appName, streamName) span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Get", + opentracing.Tag{Key: "appName", Value: c.appName}, + opentracing.Tag{Key: "tableName", Value: c.tableName}, opentracing.Tag{Key: "namespace", Value: namespace}, opentracing.Tag{Key: "shardID", Value: shardID}, ) @@ -131,6 +133,8 @@ func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumbe c.mu.Lock() defer c.mu.Unlock() span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Set", + opentracing.Tag{Key: "appName", Value: c.appName}, + opentracing.Tag{Key: "tableName", Value: c.tableName}, opentracing.Tag{Key: "stream.name", Value: streamName}, opentracing.Tag{Key: "shardID", Value: shardID}, ) @@ -174,6 +178,8 @@ func (c *Checkpoint) save(ctx context.Context) error { defer c.mu.Unlock() span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.save") defer span.Finish() + span = span.SetTag("appName", c.appName) + span = span.SetTag("tableName", c.tableName) for key, sequenceNumber := range c.checkpoints { item, err := dynamodbattribute.MarshalMap(item{ diff --git a/consumer.go b/consumer.go index 4ba3809..44e690d 100644 --- a/consumer.go +++ b/consumer.go @@ -103,7 +103,7 @@ type Consumer struct { // Scan scans each of the shards of the stream, calls the callback // func with each of the kinesis records. -func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error { +func (c *Consumer) Scan(ctx context.Context, fn func(context.Context, *Record) ScanStatus) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -164,7 +164,7 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error func (c *Consumer) ScanShard( ctx context.Context, shardID string, - fn func(*Record) ScanStatus, + fn func(context.Context, *Record) ScanStatus, ) error { span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.scanshard") defer span.Finish() @@ -184,12 +184,12 @@ func (c *Consumer) ScanShard( return fmt.Errorf("get shard iterator error: %v", err) } - c.logger.Log("scanning", shardID, lastSeqNum) + c.logger.Log(fmt.Sprintf("scanning shardID %s lastSeqNum %s", shardID, lastSeqNum)) return c.scanPagesOfShard(ctx, shardID, lastSeqNum, shardIterator, fn) } -func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(*Record) ScanStatus) error { +func (c *Consumer) scanPagesOfShard(ctx context.Context, shardID, lastSeqNum string, shardIterator *string, fn func(context.Context, *Record) ScanStatus) error { span := opentracing.SpanFromContext(ctx) for { select { @@ -241,10 +241,10 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool { return nextShardIterator == nil || currentShardIterator == nextShardIterator } -func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, fn func(*Record) ScanStatus) (isScanStopped bool, err error) { +func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, fn func(context.Context, *Record) ScanStatus) (isScanStopped bool, err error) { span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.handleRecord") defer span.Finish() - status := fn(r) + status := fn(ctx, r) if !status.SkipCheckpoint { span.LogKV("scan.state", status) if err := c.checkpoint.Set(ctx, c.streamName, shardID, *r.SequenceNumber); err != nil { @@ -272,6 +272,7 @@ func (c *Consumer) handleRecord(ctx context.Context, shardID string, r *Record, func (c *Consumer) getShardIDs(ctx context.Context, streamName string) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIDs") defer span.Finish() + span = span.SetTag("streamName", streamName) resp, err := c.client.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ @@ -293,14 +294,20 @@ func (c *Consumer) getShardIDs(ctx context.Context, streamName string) ([]string func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, lastSeqNum string) (*string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.getShardIterator", - opentracing.Tag{Key: "lastSeqNum", Value: "lastSeqNum"}) + opentracing.Tag{Key: "streamName", Value: streamName}, + opentracing.Tag{Key: "shardID", Value: shardID}, + opentracing.Tag{Key: "lastSeqNum", Value: lastSeqNum}) defer span.Finish() - + shard := aws.String(shardID) + stream := aws.String(streamName) params := &kinesis.GetShardIteratorInput{ - ShardId: aws.String(shardID), - StreamName: aws.String(streamName), + ShardId: shard, + StreamName: stream, } + span = span.SetTag("shardID", shard) + span = span.SetTag("streamName", stream) + if lastSeqNum != "" { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(lastSeqNum) diff --git a/examples/distributed-tracing/consumer/consumer.go b/examples/distributed-tracing/consumer/consumer.go index f5ada80..d725433 100644 --- a/examples/distributed-tracing/consumer/consumer.go +++ b/examples/distributed-tracing/consumer/consumer.go @@ -39,6 +39,7 @@ func init() { } func main() { + ctx := context.Background() log := utility.NewLogger(serviceName, alog.DebugLevel) tracer, closer := utility.NewTracer(serviceName) defer closer.Close() @@ -52,9 +53,9 @@ func main() { table := flag.String("table", "", "Checkpoint table name") flag.Parse() - span.SetTag("app.name", app) - span.SetTag("stream.name", stream) - span.SetTag("table.name", table) + span.SetBaggageItem("app.name", *app) + span.SetBaggageItem("stream.name", *stream) + span.SetBaggageItem("table.name", *table) fmt.Println("set tag....") @@ -67,7 +68,7 @@ func main() { myDynamoDbClient := dynamodb.New(sess) // ddb checkpoint - ctx := opentracing.ContextWithSpan(context.Background(), span) + ctx = opentracing.ContextWithSpan(ctx, span) retryer := utility.NewRetryer() ck, err := checkpoint.New(ctx, *app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(retryer)) if err != nil { @@ -113,7 +114,9 @@ func main() { }() // scan stream - err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanStatus { + err = c.Scan(ctx, func(ctx context.Context, r *consumer.Record) consumer.ScanStatus { + span, _ := opentracing.StartSpanFromContext(ctx, "consumer.processRecord") + defer span.Finish() fmt.Println(string(r.Data)) // continue scanning return consumer.ScanStatus{} diff --git a/examples/distributed-tracing/producer.png b/examples/distributed-tracing/producer.png new file mode 100644 index 0000000..e7bd8a8 Binary files /dev/null and b/examples/distributed-tracing/producer.png differ diff --git a/examples/distributed-tracing/producer/producer.go b/examples/distributed-tracing/producer/producer.go index 5bf7ff8..d66d0df 100644 --- a/examples/distributed-tracing/producer/producer.go +++ b/examples/distributed-tracing/producer/producer.go @@ -43,8 +43,7 @@ func main() { var streamName = flag.String("stream", "", "Stream name") flag.Parse() - - span.SetTag("producer.stream.name", streamName) + span.SetBaggageItem("producer.stream.name", *streamName) // download file with test data // curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt