Add sample pipeline

* Reduce the size of main README in favor of adding functioning examples
* Add S3 Pipeline exmaple
* Add example of seeing the stream
This commit is contained in:
Harlow Ward 2015-05-23 15:22:58 -07:00
parent fdd39482bb
commit 25e390e8b4
7 changed files with 192 additions and 96 deletions

3
.gitignore vendored
View file

@ -3,6 +3,9 @@
*.a
*.so
# Environment vars
.env
# Folders
_obj
_test

View file

@ -22,8 +22,6 @@ A connector pipeline uses the following interfaces:
* __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination.
* __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream.
## Usage
### Installation
Get the package source:
@ -43,103 +41,10 @@ The customer logger must implement the [Logger interface][log_interface].
[log_interface]: https://github.com/harlow/kinesis-connectors/blob/master/logger.go
### Example Pipeline
The S3 Connector Pipeline performs the following steps:
1. Pull records from Kinesis and buffer them untill the desired threshold is met.
2. Upload the batch of records to an S3 bucket.
3. Set the current Shard checkpoint in Redis.
The config vars are loaded done with [gcfg].
[gcfg]: https://code.google.com/p/gcfg/
```go
package main
import (
"fmt"
"os"
"code.google.com/p/gcfg"
"github.com/harlow/kinesis-connectors"
"github.com/sendgridlabs/go-kinesis"
)
type Config struct {
Pipeline struct {
Name string
}
Kinesis struct {
BufferSize int
ShardCount int
StreamName string
}
S3 struct {
BucketName string
}
}
func newS3Pipeline(cfg Config) *connector.Pipeline {
f := &connector.AllPassFilter{}
b := &connector.RecordBuffer{
NumRecordsToBuffer: cfg.Kinesis.BufferSize,
}
t := &connector.StringToStringTransformer{}
c := &connector.RedisCheckpoint{
AppName: cfg.Pipeline.Name,
StreamName: cfg.Kinesis.StreamName,
}
e := &connector.S3Emitter{
S3Bucket: cfg.S3.BucketName,
}
return &connector.Pipeline{
Buffer: b,
Checkpoint: c,
Emitter: e,
Filter: f,
StreamName: cfg.Kinesis.StreamName,
Transformer: t,
}
}
func main() {
// Load config vars
var cfg Config
err := gcfg.ReadFileInto(&cfg, "pipeline.cfg")
// Set up kinesis client and stream
accessKey := os.Getenv("AWS_ACCESS_KEY")
secretKey := os.Getenv("AWS_SECRET_KEY")
ksis := kinesis.New(accessKey, secretKey, kinesis.Region{})
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
// Fetch stream info
args := kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.StreamName)
streamInfo, err := ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
return
}
// Process kinesis shards
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
<-make(chan int)
}
```
## Contributing
Please see [CONTRIBUTING.md].
Thank you, [contributors]!
Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
[LICENSE]: /MIT-LICENSE
[CONTRIBUTING.md]: /CONTRIBUTING.md

View file

@ -0,0 +1,25 @@
# S3 Pipeline
The S3 Connector Pipeline performs the following steps:
1. Pull records from Kinesis and buffer them untill the desired threshold is met.
2. Upload the batch of records to an S3 bucket.
3. Set the current Shard checkpoint in Redis.
The pipleline config vars are loaded done with [gcfg].
[gcfg]: https://code.google.com/p/gcfg/
### Environment Variables
Export the required environment vars for connecting to the Kinesis stream:
```
export AWS_ACCESS_KEY=
export AWS_REGION_NAME=
export AWS_SECRET_KEY=
```
### Running the code
$ go run main.go

View file

@ -0,0 +1,82 @@
package main
import (
"fmt"
"github.com/harlow/kinesis-connectors"
"github.com/sendgridlabs/go-kinesis"
)
type Config struct {
Pipeline struct {
Name string
}
Kinesis struct {
BufferSize int
ShardCount int
StreamName string
}
S3 struct {
BucketName string
}
}
func newS3Pipeline(cfg Config) *connector.Pipeline {
f := &connector.AllPassFilter{}
b := &connector.RecordBuffer{
NumRecordsToBuffer: cfg.Kinesis.BufferSize,
}
t := &connector.StringToStringTransformer{}
c := &connector.RedisCheckpoint{
AppName: cfg.Pipeline.Name,
StreamName: cfg.Kinesis.StreamName,
}
e := &connector.S3Emitter{
S3Bucket: cfg.S3.BucketName,
}
return &connector.Pipeline{
Buffer: b,
Checkpoint: c,
Emitter: e,
Filter: f,
StreamName: cfg.Kinesis.StreamName,
Transformer: t,
}
}
func main() {
var cfg Config
var err error
// Load config vars
err = gcfg.ReadFileInto(&cfg, "pipeline.cfg")
if err != nil {
fmt.Printf("Config ERROR: %v\n", err)
}
// Initialize Kinesis client
auth := kinesis.NewAuth()
ksis := kinesis.New(&auth, kinesis.Region{})
// Create stream
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
// Fetch stream info
args := kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.StreamName)
streamInfo, err := ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
return
}
// Process kinesis shards
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
<-make(chan int)
}

View file

@ -0,0 +1,8 @@
[pipeline]
name = s3Pipeline
[s3]
bucketName = kinesis.test
[kinesis]
bufferSize = 100
shardCount = 2
streamName = userStream

View file

@ -0,0 +1,19 @@
# Seed the Stream
A prepopulated file with JSON users is available on S3 for seeing the stream:
https://s3.amazonaws.com/kinesis.test/users.txt
### Environment Variables
Export the required environment vars for connecting to the Kinesis stream:
```
export AWS_ACCESS_KEY=
export AWS_REGION_NAME=
export AWS_SECRET_KEY=
```
### Running the code
$ go run main.go

View file

@ -0,0 +1,54 @@
package main
import (
"bufio"
"fmt"
"os"
"sync"
"github.com/harlow/kinesis-connectors"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
)
func main() {
godotenv.Load()
// Initialize Kinesis client
auth := kinesis.NewAuth()
ksis := kinesis.New(&auth, kinesis.Region{})
// Create stream
connector.CreateStream(ksis, "userStream", 2)
// read file
file, _ := os.Open("users.txt")
defer file.Close()
scanner := bufio.NewScanner(file)
args := kinesis.NewArgs()
args.Add("StreamName", "userStream")
ctr := 0
var wg sync.WaitGroup
for scanner.Scan() {
l := scanner.Text()
ctr = ctr + 1
key := fmt.Sprintf("partitionKey-%d", ctr)
args := kinesis.NewArgs()
args.Add("StreamName", "userStream")
args.AddRecord([]byte(l), key)
wg.Add(1)
go func() {
ksis.PutRecords(args)
fmt.Print(".")
wg.Done()
}()
}
wg.Wait()
fmt.Println(".")
fmt.Println("Finished populating userStream")
}