kinesis-consumer/examples/distributed-tracing/producer/producer.go
2018-11-14 11:51:37 -08:00

107 lines
2.8 KiB
Go

package main
import (
"bufio"
"context"
"flag"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/harlow/kinesis-consumer/examples/distributed-tracing/utility"
)
const serviceName = "producer"
const dataFile = "./users.txt"
var svc kinesisiface.KinesisAPI
func main() {
tracer, closer := utility.NewTracer(serviceName)
// Jaeger tracer implements Close not opentracing
defer closer.Close()
opentracing.InitGlobalTracer(tracer)
cfg := aws.NewConfig().WithRegion("us-west-2")
sess := session.New(cfg)
sess = utility.WrapSession(sess)
svc = kinesis.New(sess)
ctx, _ := context.WithCancel(context.Background())
span := tracer.StartSpan("producer.main")
defer span.Finish()
var streamName = flag.String("stream", "", "Stream name")
flag.Parse()
span.SetBaggageItem("producer.stream.name", *streamName)
// download file with test data
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
f, err := os.Open(dataFile)
if err != nil {
span.LogKV("file open error", err.Error())
ext.Error.Set(span, true)
// Need to end span here, since Fatalf calls os.Exit
span.Finish()
closer.Close()
log.Fatal(fmt.Sprintf("Cannot open %s file", dataFile))
}
defer f.Close()
span.SetTag("producer.file.name", f.Name())
// Wrap the span with meta into context and flow that
// to another component.
ctx = opentracing.ContextWithSpan(ctx, span)
var records []*kinesis.PutRecordsRequestEntry
// loop over file data
b := bufio.NewScanner(f)
for b.Scan() {
records = append(records, &kinesis.PutRecordsRequestEntry{
Data: b.Bytes(),
PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)),
})
if len(records) > 250 {
putRecords(ctx, streamName, records)
records = nil
}
}
if len(records) > 0 {
putRecords(ctx, streamName, records)
}
}
func putRecords(ctx context.Context, streamName *string, records []*kinesis.PutRecordsRequestEntry) {
// I am assuming each new AWS call is a new Span
span, ctx := opentracing.StartSpanFromContext(ctx, "producer.putRecords")
defer span.Finish()
span.SetTag("producer.records.count", len(records))
ctx = opentracing.ContextWithSpan(ctx, span)
_, err := svc.PutRecordsWithContext(ctx, &kinesis.PutRecordsInput{
StreamName: streamName,
Records: records,
})
if err != nil {
// Log the error details and set the Span as failee
span.LogKV("put records error", err.Error())
ext.Error.Set(span, true)
// Need to end span here, since Fatalf calls os.Exit
span.Finish()
log.Fatalf("error putting records: %v", err)
}
fmt.Print(".")
}