From 89570130f5842b3f6291198364af39a09d6af95d Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 26 Nov 2017 15:59:17 -0800 Subject: [PATCH] Leverage bigger batchsize when seeding example data --- examples/producer/main.go | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/examples/producer/main.go b/examples/producer/main.go index f30efc6..08b14ae 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -14,6 +14,8 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) +var svc = kinesis.New(session.New()) + func main() { log.SetHandler(text.New(os.Stderr)) log.SetLevel(log.DebugLevel) @@ -29,10 +31,7 @@ func main() { } defer f.Close() - var ( - svc = kinesis.New(session.New()) - records []*kinesis.PutRecordsRequestEntry - ) + var records []*kinesis.PutRecordsRequestEntry // loop over file data b := bufio.NewScanner(f) @@ -42,17 +41,24 @@ func main() { PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)), }) - if len(records) > 50 { - _, err = svc.PutRecords(&kinesis.PutRecordsInput{ - StreamName: streamName, - Records: records, - }) - if err != nil { - log.WithError(err).Fatal("error producing") - } + if len(records) > 250 { + putRecords(streamName, records) records = nil } + } - fmt.Print(".") + if len(records) > 0 { + putRecords(streamName, records) } } + +func putRecords(streamName *string, records []*kinesis.PutRecordsRequestEntry) { + _, err := svc.PutRecords(&kinesis.PutRecordsInput{ + StreamName: streamName, + Records: records, + }) + if err != nil { + log.Fatal("error putting records") + } + fmt.Print(".") +}