Update example with Redshift Manifest Pipeline

This commit is contained in:
Harlow Ward 2014-12-20 19:45:34 -08:00
parent b7f4cfc841
commit 1bce60bae6

187
README.md
View file

@ -27,107 +27,136 @@ Install the library:
$ go get github.com/harlow/kinesis-connectors
### Example Redshift Pipeline
### Example Redshift Manifest Pipeline
The Redshift Pipeline will pull records from Kinesis and buffer them untill the desired threshold is reached. The Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and copy data to to Redshift.
The Redshift Manifest Pipeline works in several steps:
Pipeline properties:
1. Pull records from Kinesis and buffer them untill the desired threshold is reached. The S3 Manifest Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and put the file path onto the manifest stream.
2. Pull S3 path records from Kinesis and batch into a Manifest file. Upload the manifest to S3 and issue the COPY command to Redshift.
```
# Connector Settings
appName = kinesisToRedshiftBasic
numRecordsToBuffer = 25
# S3 Settings
s3Bucket = bucketName
# Kinesis Settings
kinesisStream = streamName
kinesisStreamShardCount = 2
# Redshift Settings
tableName = redshift_table_name
format = json
```
_Note:_ This example pipeline batch copies the data from Kinesis directly to the S3 bucket and uses the JSON COPY statement to load into Redshift.
The config vars are loaded done with [gcfg][3].
```go
package main
import (
"fmt"
"fmt"
"os"
"github.com/harlow/kinesis-connectors"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
"code.google.com/p/gcfg"
"github.com/harlow/kinesis-connectors"
"github.com/sendgridlabs/go-kinesis"
)
type Config struct {
AppName string
Format string
KinesisStream string
KinesisStreamShardCount int
NumRecordsToBuffer int
S3Bucket string
TableName string
}
func NewPipeline(cfg Config) *connector.Pipeline {
f := connector.AllPassFilter{}
b := connector.RecordBuffer{
NumRecordsToBuffer: cfg.NumRecordsToBuffer,
}
t := connector.StringToStringTransformer{}
c := connector.RedisCheckpoint{
AppName: cfg.AppName,
StreamName: cfg.KinesisStream,
}
e := connector.RedshiftBasicEmtitter{
TableName: cfg.TableName,
S3Bucket: cfg.S3Bucket,
Format: cfg.Format,
}
return &connector.Pipeline{
Buffer: &b,
Checkpoint: &c,
Emitter: &e,
Filter: &f,
StreamName: cfg.KinesisStream,
Transformer: &t,
}
Pipeline struct {
Name string
}
Redshift struct {
CopyMandatory bool
DataTable string
FileTable string
Format string
}
Kinesis struct {
InputBufferSize int
InputShardCount int
InputStream string
OutputBufferSize int
OutputShardCount int
OutputStream string
}
S3 struct {
BucketName string
}
}
func main() {
var cfg Config
godotenv.Load()
ksis := kinesis.New("", "", kinesis.Region{})
var cfg Config
err := gcfg.ReadFileInto(&cfg, "config.cfg")
connector.LoadConfig(&cfg, "redshift_basic_pipeline.properties")
connector.CreateAndWaitForStreamToBecomeAvailable(ksis, cfg.KinesisStream, cfg.KinesisStreamShardCount)
// Set up kinesis client
accessKey := os.Getenv("AWS_ACCESS_KEY")
secretKey := os.Getenv("AWS_SECRET_KEY")
ksis := kinesis.New(accessKey, secretKey, kinesis.Region{})
args := kinesis.NewArgs()
args.Add("StreamName", cfg.KinesisStream)
streamInfo, err := ksis.DescribeStream(args)
// Create and wait for streams
connector.CreateStream(ksis, cfg.Kinesis.InputStream, cfg.Kinesis.InputShardCount)
connector.CreateStream(ksis, cfg.Kinesis.OutputStream, cfg.Kinesis.OutputShardCount)
if err != nil {
fmt.Printf("Unable to connect to %v stream. Aborting.", cfg.KinesisStream)
return
}
// Process mobile event stream
args := kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.InputStream)
streamInfo, err := ksis.DescribeStream(args)
for _, shard := range streamInfo.StreamDescription.Shards {
var p = NewPipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
}
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream)
return
}
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.InputStream)
f := connector.AllPassFilter{}
b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.InputBufferSize}
t := connector.StringToStringTransformer{}
c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.InputStream}
e := connector.S3ManifestEmitter{
OutputStream: cfg.Kinesis.OutputStream,
S3Bucket: cfg.S3.BucketName,
Ksis: ksis,
}
p := &connector.Pipeline{
Buffer: &b,
Checkpoint: &c,
Emitter: &e,
Filter: &f,
StreamName: cfg.Kinesis.InputStream,
Transformer: &t,
}
go p.ProcessShard(ksis, shard.ShardId)
}
// Process manifest stream
args = kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.OutputStream)
streamInfo, err = ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream)
return
}
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.OutputStream)
f := connector.AllPassFilter{}
b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.OutputBufferSize}
t := connector.StringToStringTransformer{}
c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.OutputStream}
e := connector.RedshiftManifestEmitter{
CopyMandatory: cfg.Redshift.CopyMandatory,
DataTable: cfg.Redshift.DataTable,
FileTable: cfg.Redshift.FileTable,
Format: cfg.Redshift.Format,
S3Bucket: cfg.S3.BucketName,
}
p := &connector.Pipeline{
Buffer: &b,
Checkpoint: &c,
Emitter: &e,
Filter: &f,
StreamName: cfg.Kinesis.OutputStream,
Transformer: &t,
}
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
<-make(chan int)
}
select {}
}
```
[1]: https://github.com/awslabs/amazon-kinesis-connectors
[2]: http://godoc.org/github.com/harlow/kinesis-connectors
[3]: https://code.google.com/p/gcfg/