diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..b613ec1 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,7 @@ +# Examples + +Here are examples of Produder and Consumer code to provide guidance on how to use each of the different checkpoints. + +The examples will run against [Kinesis Lite](https://github.com/mhart/kinesalite). + + $ kinesalite diff --git a/examples/consumer/cp-dynamo/README.md b/examples/consumer/cp-dynamo/README.md index ea7e634..f313824 100644 --- a/examples/consumer/cp-dynamo/README.md +++ b/examples/consumer/cp-dynamo/README.md @@ -2,15 +2,6 @@ Read records from the Kinesis stream -### Environment Variables - -Export the required environment vars for connecting to the Kinesis stream: - -``` -export AWS_PROFILE= -export AWS_REGION= -``` - ### Run the consumer $ go run main.go --app appName --stream streamName --table tableName diff --git a/examples/consumer/cp-dynamo/main.go b/examples/consumer/cp-dynamo/main.go index c4f9d93..4449689 100644 --- a/examples/consumer/cp-dynamo/main.go +++ b/examples/consumer/cp-dynamo/main.go @@ -54,21 +54,28 @@ func main() { } var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") - table = flag.String("table", "", "Checkpoint table name") + app = flag.String("app", "", "Consumer app name") + stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Checkpoint table name") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") ) flag.Parse() + // New Kinesis and DynamoDB clients (if you need custom config) sess, err := session.NewSession(aws.NewConfig()) if err != nil { log.Log("new session error: %v", err) } - - // New Kinesis and DynamoDB clients (if you need custom config) - myKsis := kinesis.New(sess) myDdbClient := dynamodb.New(sess) + var myKsis = kinesis.New(session.Must(session.NewSession( + aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3), + ))) + // ddb persitance ddb, err := storage.New(*app, *table, storage.WithDynamoClient(myDdbClient), storage.WithRetryer(&MyRetryer{})) if err != nil { diff --git a/examples/consumer/cp-mysql/README.md b/examples/consumer/cp-mysql/README.md index 5a7690c..646ac16 100644 --- a/examples/consumer/cp-mysql/README.md +++ b/examples/consumer/cp-mysql/README.md @@ -2,15 +2,6 @@ Read records from the Kinesis stream using mysql as checkpoint -## Environment Variables - -Export the required environment vars for connecting to the Kinesis stream: - -```shell -export AWS_PROFILE= -export AWS_REGION= -``` - ## Run the consumer go run main.go --app --stream --table --connection diff --git a/examples/consumer/cp-mysql/main.go b/examples/consumer/cp-mysql/main.go index 8b8c85b..2b67e04 100644 --- a/examples/consumer/cp-mysql/main.go +++ b/examples/consumer/cp-mysql/main.go @@ -9,31 +9,45 @@ import ( "os" "os/signal" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/store/mysql" + store "github.com/harlow/kinesis-consumer/store/mysql" ) func main() { var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") - table = flag.String("table", "", "Table name") - connStr = flag.String("connection", "", "Connection Str") + app = flag.String("app", "", "Consumer app name") + stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Table name") + connStr = flag.String("connection", "", "Connection Str") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") ) flag.Parse() // mysql checkpoint - ck, err := checkpoint.New(*app, *table, *connStr) + store, err := store.New(*app, *table, *connStr) if err != nil { log.Fatalf("checkpoint error: %v", err) } var counter = expvar.NewMap("counters") + // client + cfg := aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3) + + var client = kinesis.New(session.Must(session.NewSession(cfg))) + // consumer c, err := consumer.New( *stream, - consumer.WithStore(ck), + consumer.WithClient(client), + consumer.WithStore(store), consumer.WithCounter(counter), ) if err != nil { @@ -62,7 +76,7 @@ func main() { log.Fatalf("scan error: %v", err) } - if err := ck.Shutdown(); err != nil { - log.Fatalf("checkpoint shutdown error: %v", err) + if err := store.Shutdown(); err != nil { + log.Fatalf("store shutdown error: %v", err) } } diff --git a/examples/consumer/cp-postgres/README.md b/examples/consumer/cp-postgres/README.md index 889fee8..53de45c 100644 --- a/examples/consumer/cp-postgres/README.md +++ b/examples/consumer/cp-postgres/README.md @@ -2,15 +2,6 @@ Read records from the Kinesis stream using postgres as checkpoint -## Environment Variables - -Export the required environment vars for connecting to the Kinesis stream: - -```shell -export AWS_PROFILE= -export AWS_REGION= -``` - ## Run the consumer go run main.go --app appName --stream streamName --table tableName --connection connectionString diff --git a/examples/consumer/cp-postgres/main.go b/examples/consumer/cp-postgres/main.go index b43812d..525cc42 100644 --- a/examples/consumer/cp-postgres/main.go +++ b/examples/consumer/cp-postgres/main.go @@ -9,31 +9,45 @@ import ( "os" "os/signal" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/store/postgres" + store "github.com/harlow/kinesis-consumer/store/postgres" ) func main() { var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") - table = flag.String("table", "", "Table name") - connStr = flag.String("connection", "", "Connection Str") + app = flag.String("app", "", "Consumer app name") + stream = flag.String("stream", "", "Stream name") + table = flag.String("table", "", "Table name") + connStr = flag.String("connection", "", "Connection Str") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") ) flag.Parse() // postgres checkpoint - ck, err := checkpoint.New(*app, *table, *connStr) + store, err := store.New(*app, *table, *connStr) if err != nil { log.Fatalf("checkpoint error: %v", err) } var counter = expvar.NewMap("counters") + // client + cfg := aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3) + + var client = kinesis.New(session.Must(session.NewSession(cfg))) + // consumer c, err := consumer.New( *stream, - consumer.WithStore(ck), + consumer.WithClient(client), + consumer.WithStore(store), consumer.WithCounter(counter), ) if err != nil { @@ -62,7 +76,7 @@ func main() { log.Fatalf("scan error: %v", err) } - if err := ck.Shutdown(); err != nil { - log.Fatalf("checkpoint shutdown error: %v", err) + if err := store.Shutdown(); err != nil { + log.Fatalf("store shutdown error: %v", err) } } diff --git a/examples/consumer/cp-redis/README.md b/examples/consumer/cp-redis/README.md index b6d30b3..e05bdc6 100644 --- a/examples/consumer/cp-redis/README.md +++ b/examples/consumer/cp-redis/README.md @@ -2,16 +2,6 @@ Read records from the Kinesis stream -### Environment Variables - -Export the required environment vars for connecting to the Kinesis stream and Redis for checkpoint: - -``` -export AWS_PROFILE= -export AWS_REGION= -export REDIS_URL= -``` - ### Run the consumer $ go run main.go --app appName --stream streamName diff --git a/examples/consumer/cp-redis/main.go b/examples/consumer/cp-redis/main.go index e08ec16..5e6fbb8 100644 --- a/examples/consumer/cp-redis/main.go +++ b/examples/consumer/cp-redis/main.go @@ -8,8 +8,11 @@ import ( "os" "os/signal" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" consumer "github.com/harlow/kinesis-consumer" - checkpoint "github.com/harlow/kinesis-consumer/store/redis" + store "github.com/harlow/kinesis-consumer/store/redis" ) // A myLogger provides a minimalistic logger satisfying the Logger interface. @@ -24,15 +27,17 @@ func (l *myLogger) Log(args ...interface{}) { func main() { var ( - app = flag.String("app", "", "Consumer app name") - stream = flag.String("stream", "", "Stream name") + app = flag.String("app", "", "Consumer app name") + stream = flag.String("stream", "", "Stream name") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") ) flag.Parse() - // redis checkpoint - ck, err := checkpoint.New(*app) + // redis checkpoint store + store, err := store.New(*app) if err != nil { - log.Fatalf("checkpoint error: %v", err) + log.Fatalf("store error: %v", err) } // logger @@ -40,10 +45,19 @@ func main() { logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags), } + // client + cfg := aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3) + + var client = kinesis.New(session.Must(session.NewSession(cfg))) + // consumer c, err := consumer.New( *stream, - consumer.WithStore(ck), + consumer.WithClient(client), + consumer.WithStore(store), consumer.WithLogger(logger), ) if err != nil { diff --git a/examples/producer/README.md b/examples/producer/README.md index da7c13b..71ff146 100644 --- a/examples/producer/README.md +++ b/examples/producer/README.md @@ -2,15 +2,6 @@ A prepopulated file with JSON users is available on S3 for seeing the stream. -### Environment Variables - -Export the required environment vars for connecting to the Kinesis stream: - -``` -export AWS_PROFILE= -export AWS_REGION= -``` - -### Running the code +## Running the code $ go run main.go --stream streamName diff --git a/examples/producer/main.go b/examples/producer/main.go index d59aa61..437f6b8 100644 --- a/examples/producer/main.go +++ b/examples/producer/main.go @@ -13,8 +13,17 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" ) +const ( + kinesisEndpoint = "http://localhost:4567" + awsRegion = "us-west-2" +) + func main() { - var streamName = flag.String("stream", "", "Stream name") + var ( + streamName = flag.String("stream", "", "Stream name") + kinesisEndpoint = flag.String("endpoint", "http://localhost:4567", "Kinesis endpoint") + awsRegion = flag.String("region", "us-west-2", "AWS Region") + ) flag.Parse() // open dummy user data @@ -26,11 +35,17 @@ func main() { var records []*kinesis.PutRecordsRequestEntry - sess, err := session.NewSession(aws.NewConfig()) - if err != nil { - log.Fatal(err) + cfg := aws.NewConfig(). + WithEndpoint(*kinesisEndpoint). + WithRegion(*awsRegion). + WithLogLevel(3) + + var client = kinesis.New(session.Must(session.NewSession(cfg))) + + // create stream if doesn't exist + if err := createStream(client, streamName); err != nil { + log.Fatalf("create stream error: %v", err) } - var client = kinesis.New(sess) // loop over file data b := bufio.NewScanner(f) @@ -51,6 +66,28 @@ func main() { } } +func createStream(client *kinesis.Kinesis, streamName *string) error { + resp, err := client.ListStreams(&kinesis.ListStreamsInput{}) + if err != nil { + return fmt.Errorf("list streams error: %v", err) + } + + for _, val := range resp.StreamNames { + if *streamName == *val { + return nil + } + } + + _, err = client.CreateStream( + &kinesis.CreateStreamInput{ + StreamName: streamName, + ShardCount: aws.Int64(2), + }, + ) + + return err +} + func putRecords(client *kinesis.Kinesis, streamName *string, records []*kinesis.PutRecordsRequestEntry) { _, err := client.PutRecords(&kinesis.PutRecordsInput{ StreamName: streamName, diff --git a/go.mod b/go.mod index d507334..28bf0e5 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/harlow/kinesis-consumer require ( - github.com/DATA-DOG/go-sqlmock v1.3.3 // indirect + github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect github.com/alicebob/miniredis v2.5.0+incompatible github.com/apex/log v1.0.0 diff --git a/go.sum b/go.sum index 4501d2e..3afd38c 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/DATA-DOG/go-sqlmock v1.3.3 h1:CWUqKXe0s8A2z6qCgkP4Kru7wC11YoAnoupUKFDnH08= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=