From b451fc4cc2ec5f25ef3e803d7fd5d01442930c90 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Mon, 2 Sep 2019 08:08:21 -0700 Subject: [PATCH] Use stdin for example data reader instead of file path --- README.md | 10 +++++++++- cmd/producer/README.md | 2 +- cmd/producer/main.go | 19 ++++++++++--------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index c8615ea..4d941e7 100644 --- a/README.md +++ b/README.md @@ -349,7 +349,15 @@ There are example Produder and Consumer code in `/cmd` directory. These should h The examples run locally against [Kinesis Lite](https://github.com/mhart/kinesalite). - $ kinesalite + $ kinesalite & + +Produce data to the stream: + + $ cat cmd/producer/users.txt | go run cmd/producer/main.go --stream myStream + +Consume data from the stream: + + $ go run cmd/consumer/main.go --stream myStream ## Contributing diff --git a/cmd/producer/README.md b/cmd/producer/README.md index 71ff146..d3939e8 100644 --- a/cmd/producer/README.md +++ b/cmd/producer/README.md @@ -4,4 +4,4 @@ A prepopulated file with JSON users is available on S3 for seeing the stream. ## Running the code - $ go run main.go --stream streamName + $ cat users.txt | go run main.go --stream streamName diff --git a/cmd/producer/main.go b/cmd/producer/main.go index 7fa0662..798acfd 100644 --- a/cmd/producer/main.go +++ b/cmd/producer/main.go @@ -21,13 +21,6 @@ func main() { ) flag.Parse() - // open dummy user data - f, err := os.Open("users.txt") - if err != nil { - log.Fatal("Cannot open users.txt file") - } - defer f.Close() - var records []*kinesis.PutRecordsRequestEntry var client = kinesis.New(session.Must(session.NewSession( @@ -43,7 +36,8 @@ func main() { } // loop over file data - b := bufio.NewScanner(f) + b := bufio.NewScanner(os.Stdin) + for b.Scan() { records = append(records, &kinesis.PutRecordsRequestEntry{ Data: b.Bytes(), @@ -79,8 +73,15 @@ func createStream(client *kinesis.Kinesis, streamName *string) error { ShardCount: aws.Int64(2), }, ) + if err != nil { + return err + } - return err + return client.WaitUntilStreamExists( + &kinesis.DescribeStreamInput{ + StreamName: streamName, + }, + ) } func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) {