From 1bce60bae648b04fa3d500f601bbf28cbbe9118e Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 20 Dec 2014 19:45:34 -0800 Subject: [PATCH] Update example with Redshift Manifest Pipeline --- README.md | 187 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 108 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 03a9300..5743a21 100644 --- a/README.md +++ b/README.md @@ -27,107 +27,136 @@ Install the library: $ go get github.com/harlow/kinesis-connectors -### Example Redshift Pipeline +### Example Redshift Manifest Pipeline -The Redshift Pipeline will pull records from Kinesis and buffer them untill the desired threshold is reached. The Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and copy data to to Redshift. +The Redshift Manifest Pipeline works in several steps: -Pipeline properties: +1. Pull records from Kinesis and buffer them untill the desired threshold is reached. The S3 Manifest Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and put the file path onto the manifest stream. +2. Pull S3 path records from Kinesis and batch into a Manifest file. Upload the manifest to S3 and issue the COPY command to Redshift. -``` -# Connector Settings -appName = kinesisToRedshiftBasic -numRecordsToBuffer = 25 - -# S3 Settings -s3Bucket = bucketName - -# Kinesis Settings -kinesisStream = streamName -kinesisStreamShardCount = 2 - -# Redshift Settings -tableName = redshift_table_name -format = json -``` - -_Note:_ This example pipeline batch copies the data from Kinesis directly to the S3 bucket and uses the JSON COPY statement to load into Redshift. +The config vars are loaded done with [gcfg][3]. ```go package main import ( - "fmt" + "fmt" + "os" - "github.com/harlow/kinesis-connectors" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" + "code.google.com/p/gcfg" + "github.com/harlow/kinesis-connectors" + "github.com/sendgridlabs/go-kinesis" ) type Config struct { - AppName string - Format string - KinesisStream string - KinesisStreamShardCount int - NumRecordsToBuffer int - S3Bucket string - TableName string -} - -func NewPipeline(cfg Config) *connector.Pipeline { - f := connector.AllPassFilter{} - - b := connector.RecordBuffer{ - NumRecordsToBuffer: cfg.NumRecordsToBuffer, - } - - t := connector.StringToStringTransformer{} - - c := connector.RedisCheckpoint{ - AppName: cfg.AppName, - StreamName: cfg.KinesisStream, - } - - e := connector.RedshiftBasicEmtitter{ - TableName: cfg.TableName, - S3Bucket: cfg.S3Bucket, - Format: cfg.Format, - } - - return &connector.Pipeline{ - Buffer: &b, - Checkpoint: &c, - Emitter: &e, - Filter: &f, - StreamName: cfg.KinesisStream, - Transformer: &t, - } + Pipeline struct { + Name string + } + Redshift struct { + CopyMandatory bool + DataTable string + FileTable string + Format string + } + Kinesis struct { + InputBufferSize int + InputShardCount int + InputStream string + OutputBufferSize int + OutputShardCount int + OutputStream string + } + S3 struct { + BucketName string + } } func main() { - var cfg Config - godotenv.Load() - ksis := kinesis.New("", "", kinesis.Region{}) + var cfg Config + err := gcfg.ReadFileInto(&cfg, "config.cfg") - connector.LoadConfig(&cfg, "redshift_basic_pipeline.properties") - connector.CreateAndWaitForStreamToBecomeAvailable(ksis, cfg.KinesisStream, cfg.KinesisStreamShardCount) + // Set up kinesis client + accessKey := os.Getenv("AWS_ACCESS_KEY") + secretKey := os.Getenv("AWS_SECRET_KEY") + ksis := kinesis.New(accessKey, secretKey, kinesis.Region{}) - args := kinesis.NewArgs() - args.Add("StreamName", cfg.KinesisStream) - streamInfo, err := ksis.DescribeStream(args) + // Create and wait for streams + connector.CreateStream(ksis, cfg.Kinesis.InputStream, cfg.Kinesis.InputShardCount) + connector.CreateStream(ksis, cfg.Kinesis.OutputStream, cfg.Kinesis.OutputShardCount) - if err != nil { - fmt.Printf("Unable to connect to %v stream. Aborting.", cfg.KinesisStream) - return - } + // Process mobile event stream + args := kinesis.NewArgs() + args.Add("StreamName", cfg.Kinesis.InputStream) + streamInfo, err := ksis.DescribeStream(args) - for _, shard := range streamInfo.StreamDescription.Shards { - var p = NewPipeline(cfg) - go p.ProcessShard(ksis, shard.ShardId) - } + if err != nil { + fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream) + return + } + + for _, shard := range streamInfo.StreamDescription.Shards { + fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.InputStream) + f := connector.AllPassFilter{} + b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.InputBufferSize} + t := connector.StringToStringTransformer{} + c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.InputStream} + e := connector.S3ManifestEmitter{ + OutputStream: cfg.Kinesis.OutputStream, + S3Bucket: cfg.S3.BucketName, + Ksis: ksis, + } + p := &connector.Pipeline{ + Buffer: &b, + Checkpoint: &c, + Emitter: &e, + Filter: &f, + StreamName: cfg.Kinesis.InputStream, + Transformer: &t, + } + go p.ProcessShard(ksis, shard.ShardId) + } + + // Process manifest stream + args = kinesis.NewArgs() + args.Add("StreamName", cfg.Kinesis.OutputStream) + streamInfo, err = ksis.DescribeStream(args) + + if err != nil { + fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream) + return + } + + for _, shard := range streamInfo.StreamDescription.Shards { + fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.OutputStream) + f := connector.AllPassFilter{} + b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.OutputBufferSize} + t := connector.StringToStringTransformer{} + c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.OutputStream} + e := connector.RedshiftManifestEmitter{ + CopyMandatory: cfg.Redshift.CopyMandatory, + DataTable: cfg.Redshift.DataTable, + FileTable: cfg.Redshift.FileTable, + Format: cfg.Redshift.Format, + S3Bucket: cfg.S3.BucketName, + } + p := &connector.Pipeline{ + Buffer: &b, + Checkpoint: &c, + Emitter: &e, + Filter: &f, + StreamName: cfg.Kinesis.OutputStream, + Transformer: &t, + } + go p.ProcessShard(ksis, shard.ShardId) + } + + // Keep alive + <-make(chan int) +} - select {} } ``` [1]: https://github.com/awslabs/amazon-kinesis-connectors [2]: http://godoc.org/github.com/harlow/kinesis-connectors +[3]: https://code.google.com/p/gcfg/