Add example applications

* Add example consumer and producer appications
* Ignore .env file
This commit is contained in:
Harlow Ward 2014-07-27 19:38:53 -07:00
parent b5625c98a1
commit 0ba88d49bc
4 changed files with 127 additions and 0 deletions

1
.gitignore vendored
View file

@ -19,5 +19,6 @@ _cgo_export.*
_testmain.go
*.env
*.exe
*.test

38
examples/consumer.go Normal file
View file

@ -0,0 +1,38 @@
package main
import (
"fmt"
"github.com/harlow/go-etl"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
)
func main() {
godotenv.Load()
k := kinesis.New("", "", kinesis.Region{})
s := "inputStream"
c := etl.RedisCheckpoint{}
c.SetAppName("sampleApp")
e := etl.S3Emitter{}
e.SetBucketName("bucketName")
// t := etl.EventTransformer{}
args := kinesis.NewArgs()
args.Add("StreamName", s)
streamInfo, err := k.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %v stream. Aborting.", s)
return
}
for _, shard := range streamInfo.StreamDescription.Shards {
go etl.GetRecords(k, &c, e, s, shard.ShardId)
}
select {}
}

34
examples/producer.go Normal file
View file

@ -0,0 +1,34 @@
package main
import (
"fmt"
"github.com/harlow/go-etl"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
)
func putSampleDataOnStream(ksis *kinesis.Kinesis, streamName string, numRecords int) {
for i := 0; i < numRecords; i++ {
args := kinesis.NewArgs()
args.Add("StreamName", streamName)
args.AddData([]byte(fmt.Sprintf("Hello AWS Kinesis %d", i)))
args.Add("PartitionKey", fmt.Sprintf("partitionKey-%d", i))
resp, err := ksis.PutRecord(args)
if err != nil {
fmt.Printf("PutRecord err: %v\n", err)
} else {
fmt.Printf("SequenceNumber: %v\n", resp.SequenceNumber)
}
}
}
func main() {
godotenv.Load()
streamName := "inputStream"
ksis := kinesis.New("", "", kinesis.Region{})
etl.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2)
putSampleDataOnStream(ksis, streamName, 50)
// deleteStream(ksis, streamName)
}

54
utils.go Normal file
View file

@ -0,0 +1,54 @@
package etl
import(
"fmt"
"time"
"github.com/sendgridlabs/go-kinesis"
)
func CreateAndWaitForStreamToBecomeAvailable(ksis *kinesis.Kinesis, streamName string, shardCount int) {
if !StreamExists(ksis, streamName) {
err := ksis.CreateStream(streamName, shardCount)
if err != nil {
fmt.Printf("CreateStream ERROR: %v\n", err)
return
}
}
resp := &kinesis.DescribeStreamResp{}
timeout := make(chan bool, 30)
for {
args := kinesis.NewArgs()
args.Add("StreamName", streamName)
resp, _ = ksis.DescribeStream(args)
streamStatus := resp.StreamDescription.StreamStatus
fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus)
if streamStatus != "ACTIVE" {
time.Sleep(4 * time.Second)
timeout <- true
} else {
break
}
}
}
func StreamExists(ksis *kinesis.Kinesis, streamName string) bool {
args := kinesis.NewArgs()
resp, _ := ksis.ListStreams(args)
for _, name := range resp.StreamNames { if name == streamName { return true } }
return false
}
func DeleteStream(ksis *kinesis.Kinesis, streamName string) {
err := ksis.DeleteStream("test")
if err != nil {
fmt.Printf("DeleteStream ERROR: %v\n", err)
return
}
fmt.Printf("Stream [%v] is DELETING\n", streamName)
}