From 4c2aaf78a232966a6bc76368eb75369e95d91ce7 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Mon, 2 Sep 2019 07:44:26 -0700 Subject: [PATCH] Add consumer example without checkpointing --- cmd/consumer/main.go | 75 ++++++++++++++++++++++++++++++++++++++++++++ cmd/producer/main.go | 18 +++++------ 2 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 cmd/consumer/main.go diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go new file mode 100644 index 0000000..ba5d197 --- /dev/null +++ b/cmd/consumer/main.go @@ -0,0 +1,75 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + consumer "github.com/harlow/kinesis-consumer" +) + +// A myLogger provides a minimalistic logger satisfying the Logger interface. +type myLogger struct { + logger *log.Logger +} + +// Log logs the parameters to the stdlib logger. See log.Println. +func (l *myLogger) Log(args ...interface{}) { + l.logger.Println(args...) +} + +func main() { + var ( + stream = flag.String("stream", "", "Stream name") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") + ) + flag.Parse() + + // client + var client = kinesis.New(session.Must(session.NewSession( + aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion), + ))) + + // consumer + c, err := consumer.New( + *stream, + consumer.WithClient(client), + ) + if err != nil { + log.Fatalf("consumer error: %v", err) + } + + // scan + ctx := trap() + err = c.Scan(ctx, func(r *consumer.Record) error { + fmt.Println(string(r.Data)) + return nil // continue scanning + }) + if err != nil { + log.Fatalf("scan error: %v", err) + } +} + +func trap() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + + go func() { + sig := <-sigs + log.Printf("received %s", sig) + cancel() + }() + + return ctx +} diff --git a/cmd/producer/main.go b/cmd/producer/main.go index 437f6b8..7fa0662 100644 --- a/cmd/producer/main.go +++ b/cmd/producer/main.go @@ -13,11 +13,6 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) -const ( - kinesisEndpoint = "http://localhost:4567" - awsRegion = "us-west-2" -) - func main() { var ( streamName = flag.String("stream", "", "Stream name") @@ -35,12 +30,12 @@ func main() { var records []*kinesis.PutRecordsRequestEntry - cfg := aws.NewConfig(). - WithEndpoint(*kinesisEndpoint). - WithRegion(*awsRegion). - WithLogLevel(3) - - var client = kinesis.New(session.Must(session.NewSession(cfg))) + var client = kinesis.New(session.Must(session.NewSession( + aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3), + ))) // create stream if doesn't exist if err := createStream(client, streamName); err != nil { @@ -96,5 +91,6 @@ func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis. if err != nil { log.Fatalf("error putting records: %v", err) } + fmt.Print(".") }