diff --git a/examples/seed/README.md b/examples/producer/README.md similarity index 100% rename from examples/seed/README.md rename to examples/producer/README.md diff --git a/examples/producer/main.go b/examples/producer/main.go new file mode 100644 index 0000000..fd6989b --- /dev/null +++ b/examples/producer/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "bufio" + "flag" + "os" + + "github.com/apex/log" + "github.com/apex/log/handlers/text" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + prdcr "github.com/tj/go-kinesis" +) + +// Note: download file with test data +// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt +var stream = flag.String("s", "", "Stream name") + +func main() { + flag.Parse() + log.SetHandler(text.New(os.Stderr)) + + // set up producer + svc := kinesis.New(session.New()) + producer := prdcr.New(prdcr.Config{ + StreamName: *stream, + BacklogSize: 500, + Client: svc, + }) + producer.Start() + + // open data file + f, err := os.Open("/tmp/users.txt") + if err != nil { + log.Fatal("Cannot open users.txt file") + } + defer f.Close() + + // loop over file data + b := bufio.NewScanner(f) + for b.Scan() { + err := producer.Put(b.Bytes(), "site") + + if err != nil { + log.WithError(err).Fatal("error producing") + } + } + + producer.Stop() +} diff --git a/examples/seed/main.go b/examples/seed/main.go deleted file mode 100644 index 5c3a602..0000000 --- a/examples/seed/main.go +++ /dev/null @@ -1,70 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "log" - "os" - "sync" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" -) - -// Note: download file with test data -// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt -var stream = flag.String("s", "", "Stream name") - -func putToKinesis(svc *kinesis.Kinesis, data string, partitionKey string) { - params := &kinesis.PutRecordInput{ - Data: []byte(data), - PartitionKey: aws.String(partitionKey), - StreamName: aws.String(*stream), - } - - _, err := svc.PutRecord(params) - if err != nil { - fmt.Println(err.Error()) - return - } else { - fmt.Print(".") - } -} - -func main() { - flag.Parse() - - jobCh := make(chan string) - svc := kinesis.New(session.New()) - wg := &sync.WaitGroup{} - - // boot the workers for processing data - for i := 0; i < 4; i++ { - wg.Add(1) - go func() { - for data := range jobCh { - putToKinesis(svc, data, string(i)) - } - wg.Done() - }() - } - - // open data file - f, err := os.Open("/tmp/users.txt") - if err != nil { - log.Fatal("Cannot open users.txt file") - } - defer f.Close() - - // put sample data on channel - b := bufio.NewScanner(f) - for b.Scan() { - data := b.Text() - jobCh <- data - } - - fmt.Println(".") - log.Println("Finished populating stream") -}