Use AWS SDK and concurrency
This commit is contained in:
parent
0d5e9b7b02
commit
d542fa996f
1 changed files with 39 additions and 27 deletions
|
|
@ -4,44 +4,56 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/harlow/kinesis-connectors"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/joho/godotenv"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Note: download file with test data
|
||||||
|
// curl https://s3.amazonaws.com/kinesis.test/users.txt -o /tmp/users.txt
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
godotenv.Load()
|
wg := &sync.WaitGroup{}
|
||||||
|
jobCh := make(chan string)
|
||||||
|
|
||||||
// Initialize Kinesis client
|
// read sample data
|
||||||
auth := kinesis.NewAuth()
|
file, _ := os.Open("/tmp/users.txt")
|
||||||
ksis := kinesis.New(&auth, kinesis.Region{})
|
|
||||||
|
|
||||||
// Create stream
|
|
||||||
connector.CreateStream(ksis, "userStream", 2)
|
|
||||||
|
|
||||||
// read file
|
|
||||||
// https://s3.amazonaws.com/kinesis.test/users.txt
|
|
||||||
file, _ := os.Open("tmp/users.txt")
|
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
scanner := bufio.NewScanner(file)
|
scanner := bufio.NewScanner(file)
|
||||||
|
|
||||||
args := kinesis.NewArgs()
|
// initialize kinesis client
|
||||||
args.Add("StreamName", "userStream")
|
svc := kinesis.New(session.New())
|
||||||
ctr := 0
|
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for data := range jobCh {
|
||||||
|
params := &kinesis.PutRecordInput{
|
||||||
|
Data: []byte(data),
|
||||||
|
PartitionKey: aws.String("partitionKey"),
|
||||||
|
StreamName: aws.String("hw-test-stream"),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := svc.PutRecord(params)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
fmt.Print(".")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
l := scanner.Text()
|
data := scanner.Text()
|
||||||
ctr = ctr + 1
|
jobCh <- data
|
||||||
key := fmt.Sprintf("partitionKey-%d", ctr)
|
|
||||||
|
|
||||||
args := kinesis.NewArgs()
|
|
||||||
args.Add("StreamName", "userStream")
|
|
||||||
args.AddRecord([]byte(l), key)
|
|
||||||
ksis.PutRecords(args)
|
|
||||||
fmt.Print(".")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(".")
|
fmt.Println(".")
|
||||||
fmt.Println("Finished populating userStream")
|
fmt.Println("Finished populating stream")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue