kinesis-consumer/examples/producer/main.go
2024-04-12 09:44:32 +02:00

102 lines
2.3 KiB
Go

package main
import (
"bufio"
"context"
"flag"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
func main() {
var (
streamName = flag.String("stream", "", "Stream name")
kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint")
awsRegion = flag.String("region", "us-west-2", "AWS Region")
)
flag.Parse()
var records []types.PutRecordsRequestEntry
var client = kinesis.New(kinesis.Options{
BaseEndpoint: kinesisEndpoint,
Region: *awsRegion,
Credentials: credentials.NewStaticCredentialsProvider("user", "pass", "token"),
})
// create stream if doesn't exist
if err := createStream(client, *streamName); err != nil {
log.Fatalf("create stream error: %v", err)
}
// loop over file data
b := bufio.NewScanner(os.Stdin)
for b.Scan() {
records = append(records, types.PutRecordsRequestEntry{
Data: b.Bytes(),
PartitionKey: aws.String(time.Now().Format(time.RFC3339Nano)),
})
if len(records) > 250 {
putRecords(client, streamName, records)
records = nil
}
}
if len(records) > 0 {
putRecords(client, streamName, records)
}
}
func createStream(client *kinesis.Client, streamName string) error {
resp, err := client.ListStreams(context.Background(), &kinesis.ListStreamsInput{})
if err != nil {
return fmt.Errorf("list streams error: %v", err)
}
for _, val := range resp.StreamNames {
if streamName == val {
return nil
}
}
_, err = client.CreateStream(
context.Background(),
&kinesis.CreateStreamInput{
StreamName: aws.String(streamName),
ShardCount: aws.Int32(2),
},
)
if err != nil {
return err
}
waiter := kinesis.NewStreamExistsWaiter(client)
return waiter.Wait(
context.Background(),
&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
},
30*time.Second,
)
}
func putRecords(client *kinesis.Client, streamName *string, records []types.PutRecordsRequestEntry) {
_, err := client.PutRecords(context.Background(), &kinesis.PutRecordsInput{
StreamName: streamName,
Records: records,
})
if err != nil {
log.Fatalf("error putting records: %v", err)
}
fmt.Print(".")
}