From d542fa996f4dcd62cc81289b4a3e8f7066c84b64 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 9 Jan 2016 09:46:28 -0800 Subject: [PATCH] Use AWS SDK and concurrency --- examples/seed-stream/main.go | 66 +++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/examples/seed-stream/main.go b/examples/seed-stream/main.go index 03022c1..e22f533 100644 --- a/examples/seed-stream/main.go +++ b/examples/seed-stream/main.go @@ -4,44 +4,56 @@ import ( "bufio" "fmt" "os" + "sync" - "github.com/harlow/kinesis-connectors" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" ) +// Note: download file with test data +// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt + func main() { - godotenv.Load() + wg := &sync.WaitGroup{} + jobCh := make(chan string) - // Initialize Kinesis client - auth := kinesis.NewAuth() - ksis := kinesis.New(&auth, kinesis.Region{}) - - // Create stream - connector.CreateStream(ksis, "userStream", 2) - - // read file - // https://s3.amazonaws.com/kinesis.test/users.txt - file, _ := os.Open("tmp/users.txt") + // read sample data + file, _ := os.Open("/tmp/users.txt") defer file.Close() scanner := bufio.NewScanner(file) - args := kinesis.NewArgs() - args.Add("StreamName", "userStream") - ctr := 0 + // initialize kinesis client + svc := kinesis.New(session.New()) + + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + for data := range jobCh { + params := &kinesis.PutRecordInput{ + Data: []byte(data), + PartitionKey: aws.String("partitionKey"), + StreamName: aws.String("hw-test-stream"), + } + + _, err := svc.PutRecord(params) + + if err != nil { + fmt.Println(err.Error()) + return + } else { + fmt.Print(".") + } + } + wg.Done() + }() + } for scanner.Scan() { - l := scanner.Text() - ctr = ctr + 1 - key := fmt.Sprintf("partitionKey-%d", ctr) - - args := kinesis.NewArgs() - args.Add("StreamName", "userStream") - args.AddRecord([]byte(l), key) - ksis.PutRecords(args) - fmt.Print(".") + data := scanner.Text() + jobCh <- data } fmt.Println(".") - fmt.Println("Finished populating userStream") + fmt.Println("Finished populating stream") }