From 0ba88d49bce8e14555c2ef4baf9db7cee6486eb5 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sun, 27 Jul 2014 19:38:53 -0700 Subject: [PATCH] Add example applications * Add example consumer and producer appications * Ignore .env file --- .gitignore | 1 + examples/consumer.go | 38 +++++++++++++++++++++++++++++++ examples/producer.go | 34 ++++++++++++++++++++++++++++ utils.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 examples/consumer.go create mode 100644 examples/producer.go create mode 100644 utils.go diff --git a/.gitignore b/.gitignore index 8365624..92e6f5f 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,6 @@ _cgo_export.* _testmain.go +*.env *.exe *.test diff --git a/examples/consumer.go b/examples/consumer.go new file mode 100644 index 0000000..2c019e1 --- /dev/null +++ b/examples/consumer.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "github.com/harlow/go-etl" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" +) + +func main() { + godotenv.Load() + + k := kinesis.New("", "", kinesis.Region{}) + s := "inputStream" + + c := etl.RedisCheckpoint{} + c.SetAppName("sampleApp") + + e := etl.S3Emitter{} + e.SetBucketName("bucketName") + + // t := etl.EventTransformer{} + + args := kinesis.NewArgs() + args.Add("StreamName", s) + streamInfo, err := k.DescribeStream(args) + + if err != nil { + fmt.Printf("Unable to connect to %v stream. Aborting.", s) + return + } + + for _, shard := range streamInfo.StreamDescription.Shards { + go etl.GetRecords(k, &c, e, s, shard.ShardId) + } + + select {} +} diff --git a/examples/producer.go b/examples/producer.go new file mode 100644 index 0000000..391fb05 --- /dev/null +++ b/examples/producer.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "github.com/harlow/go-etl" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" +) + +func putSampleDataOnStream(ksis *kinesis.Kinesis, streamName string, numRecords int) { + for i := 0; i < numRecords; i++ { + args := kinesis.NewArgs() + args.Add("StreamName", streamName) + args.AddData([]byte(fmt.Sprintf("Hello AWS Kinesis %d", i))) + args.Add("PartitionKey", fmt.Sprintf("partitionKey-%d", i)) + resp, err := ksis.PutRecord(args) + + if err != nil { + fmt.Printf("PutRecord err: %v\n", err) + } else { + fmt.Printf("SequenceNumber: %v\n", resp.SequenceNumber) + } + } +} + +func main() { + godotenv.Load() + streamName := "inputStream" + ksis := kinesis.New("", "", kinesis.Region{}) + + etl.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2) + putSampleDataOnStream(ksis, streamName, 50) + // deleteStream(ksis, streamName) +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..b293d81 --- /dev/null +++ b/utils.go @@ -0,0 +1,54 @@ +package etl + +import( + "fmt" + "time" + "github.com/sendgridlabs/go-kinesis" +) + +func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) { + if !StreamExists(ksis, streamName) { + err := ksis.CreateStream(streamName, shardCount) + + if err != nil { + fmt.Printf("CreateStream ERROR: %v\n", err) + return + } + } + + resp := &kinesis.DescribeStreamResp{} + timeout := make(chan bool, 30) + + for { + args := kinesis.NewArgs() + args.Add("StreamName", streamName) + resp, _ = ksis.DescribeStream(args) + streamStatus := resp.StreamDescription.StreamStatus + fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + + if streamStatus != "ACTIVE" { + time.Sleep(4 * time.Second) + timeout <- true + } else { + break + } + } +} + +func StreamExists(ksis *kinesis.Kinesis, streamName string) bool { + args := kinesis.NewArgs() + resp, _ := ksis.ListStreams(args) + for _, name := range resp.StreamNames { if name == streamName { return true } } + return false +} + +func DeleteStream(ksis *kinesis.Kinesis, streamName string) { + err := ksis.DeleteStream("test") + + if err != nil { + fmt.Printf("DeleteStream ERROR: %v\n", err) + return + } + + fmt.Printf("Stream [%v] is DELETING\n", streamName) +}