Use TJ's Kinesis producer library

This commit is contained in:
Harlow Ward 2016-05-01 16:54:47 -07:00
parent 8d71bbd0ec
commit f4c8d8090d
3 changed files with 50 additions and 70 deletions

50
examples/producer/main.go Normal file
View file

@ -0,0 +1,50 @@
package main
import (
"bufio"
"flag"
"os"
"github.com/apex/log"
"github.com/apex/log/handlers/text"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
prdcr "github.com/tj/go-kinesis"
)
// Note: download file with test data
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
var stream = flag.String("s", "", "Stream name")
func main() {
flag.Parse()
log.SetHandler(text.New(os.Stderr))
// set up producer
svc := kinesis.New(session.New())
producer := prdcr.New(prdcr.Config{
StreamName: *stream,
BacklogSize: 500,
Client: svc,
})
producer.Start()
// open data file
f, err := os.Open("/tmp/users.txt")
if err != nil {
log.Fatal("Cannot open users.txt file")
}
defer f.Close()
// loop over file data
b := bufio.NewScanner(f)
for b.Scan() {
err := producer.Put(b.Bytes(), "site")
if err != nil {
log.WithError(err).Fatal("error producing")
}
}
producer.Stop()
}

View file

@ -1,70 +0,0 @@
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
// Note: download file with test data
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
var stream = flag.String("s", "", "Stream name")
func putToKinesis(svc *kinesis.Kinesis, data string, partitionKey string) {
params := &kinesis.PutRecordInput{
Data: []byte(data),
PartitionKey: aws.String(partitionKey),
StreamName: aws.String(*stream),
}
_, err := svc.PutRecord(params)
if err != nil {
fmt.Println(err.Error())
return
} else {
fmt.Print(".")
}
}
func main() {
flag.Parse()
jobCh := make(chan string)
svc := kinesis.New(session.New())
wg := &sync.WaitGroup{}
// boot the workers for processing data
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for data := range jobCh {
putToKinesis(svc, data, string(i))
}
wg.Done()
}()
}
// open data file
f, err := os.Open("/tmp/users.txt")
if err != nil {
log.Fatal("Cannot open users.txt file")
}
defer f.Close()
// put sample data on channel
b := bufio.NewScanner(f)
for b.Scan() {
data := b.Text()
jobCh <- data
}
fmt.Println(".")
log.Println("Finished populating stream")
}