Use stdin for example data reader instead of file path

This commit is contained in:
Harlow Ward 2019-09-02 08:08:21 -07:00
parent e3ee95b282
commit b451fc4cc2
3 changed files with 20 additions and 11 deletions

View file

@ -349,7 +349,15 @@ There are example Produder and Consumer code in `/cmd` directory. These should h
The examples run locally against [Kinesis Lite](https://github.com/mhart/kinesalite). The examples run locally against [Kinesis Lite](https://github.com/mhart/kinesalite).
$ kinesalite $ kinesalite &
Produce data to the stream:
$ cat cmd/producer/users.txt | go run cmd/producer/main.go --stream myStream
Consume data from the stream:
$ go run cmd/consumer/main.go --stream myStream
## Contributing ## Contributing

View file

@ -4,4 +4,4 @@ A prepopulated file with JSON users is available on S3 for seeing the stream.
## Running the code ## Running the code
$ go run main.go --stream streamName $ cat users.txt | go run main.go --stream streamName

View file

@ -21,13 +21,6 @@ func main() {
) )
flag.Parse() flag.Parse()
// open dummy user data
f, err := os.Open("users.txt")
if err != nil {
log.Fatal("Cannot open users.txt file")
}
defer f.Close()
var records []*kinesis.PutRecordsRequestEntry var records []*kinesis.PutRecordsRequestEntry
var client = kinesis.New(session.Must(session.NewSession( var client = kinesis.New(session.Must(session.NewSession(
@ -43,7 +36,8 @@ func main() {
} }
// loop over file data // loop over file data
b := bufio.NewScanner(f) b := bufio.NewScanner(os.Stdin)
for b.Scan() { for b.Scan() {
records = append(records, &kinesis.PutRecordsRequestEntry{ records = append(records, &kinesis.PutRecordsRequestEntry{
Data: b.Bytes(), Data: b.Bytes(),
@ -79,10 +73,17 @@ func createStream(client *kinesis.Kinesis, streamName *string) error {
ShardCount: aws.Int64(2), ShardCount: aws.Int64(2),
}, },
) )
if err != nil {
return err return err
} }
return client.WaitUntilStreamExists(
&kinesis.DescribeStreamInput{
StreamName: streamName,
},
)
}
func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) { func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) {
_, err := client.PutRecords(&kinesis.PutRecordsInput{ _, err := client.PutRecords(&kinesis.PutRecordsInput{
StreamName: streamName, StreamName: streamName,