From a9a625aa79d92ebb91e816980131b6a048b82ecc Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 15 Nov 2014 17:32:41 -0800 Subject: [PATCH] Update example usage in README * Showcase the new StringToStringTransformer as a simple way of getting data directly from Kinesis to Redshift. --- README.md | 216 ++++++++++++++++++------------------------------------ 1 file changed, 71 insertions(+), 145 deletions(-) diff --git a/README.md b/README.md index d216565..b45de33 100644 --- a/README.md +++ b/README.md @@ -33,178 +33,104 @@ The library has been broken into several components (buffers, checkpoints, filte The Redshift Pipeline will pull records from Kinesis and buffer them untill the desired threshold is reached. The Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and copy data to to Redshift. +Pipeline properties: + +``` +# Connector Settings +appName = kinesisToRedshiftBasic +numRecordsToBuffer = 25 + +# S3 Settings +s3Bucket = bucketName + +# Kinesis Settings +kinesisStream = streamName +kinesisStreamShardCount = 2 + +# Redshift Settings +tableName = redshift_table_name +format = json +``` + +_Note:_ This example pipeline batch copies the data from Kinesis directly to the S3 bucket and uses the JSON COPY statement to load into Redshift. + ```go package main import ( - "fmt" + "fmt" - "github.com/harlow/kinesis-connectors" - "github.com/harlow/sample-connectors/transformers" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" + "github.com/harlow/kinesis-connectors" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" ) type Config struct { - AppName string - NumRecordsToBuffer int - KinesisStream string - KinesisStreamShardCount int - TableName string - S3Bucket string - Format string - Delimiter string + AppName string + Format string + KinesisStream string + KinesisStreamShardCount int + NumRecordsToBuffer int + S3Bucket string + TableName string } func NewPipeline(cfg Config) *connector.Pipeline { - b := connector.RecordBuffer{ - NumRecordsToBuffer: cfg.NumRecordsToBuffer, - } + f := connector.AllPassFilter{} - c := connector.RedisCheckpoint{ - AppName: cfg.AppName, - StreamName: cfg.KinesisStream, - } + b := connector.RecordBuffer{ + NumRecordsToBuffer: cfg.NumRecordsToBuffer, + } - e := connector.RedshiftEmitter{ - TableName: cfg.TableName, - S3Bucket: cfg.S3Bucket, - Format: cfg.Format, - Delimiter: cfg.Delimiter, - } + t := connector.StringToStringTransformer{} - f := connector.AllPassFilter{} + c := connector.RedisCheckpoint{ + AppName: cfg.AppName, + StreamName: cfg.KinesisStream, + } - t := transformers.UserTransformer{} + e := connector.RedshiftEmitter{ + TableName: cfg.TableName, + S3Bucket: cfg.S3Bucket, + Format: cfg.Format, + } - return &connector.Pipeline{ - Buffer: &b, - Checkpoint: &c, - Emitter: &e, - Filter: &f, - StreamName: cfg.KinesisStream, - Transformer: &t, - } + return &connector.Pipeline{ + Buffer: &b, + Checkpoint: &c, + Emitter: &e, + Filter: &f, + StreamName: cfg.KinesisStream, + Transformer: &t, + } } func main() { - var cfg Config - godotenv.Load() - ksis := kinesis.New("", "", kinesis.Region{}) + var cfg Config + godotenv.Load() + ksis := kinesis.New("", "", kinesis.Region{}) - connector.LoadConfig(&cfg, "redshift_basic_pipeline.properties") - connector.CreateAndWaitForStreamToBecomeAvailable(ksis, cfg.KinesisStream, cfg.KinesisStreamShardCount) + connector.LoadConfig(&cfg, "redshift_basic_pipeline.properties") + connector.CreateAndWaitForStreamToBecomeAvailable(ksis, cfg.KinesisStream, cfg.KinesisStreamShardCount) - args := kinesis.NewArgs() - args.Add("StreamName", cfg.KinesisStream) - streamInfo, err := ksis.DescribeStream(args) + args := kinesis.NewArgs() + args.Add("StreamName", cfg.KinesisStream) + streamInfo, err := ksis.DescribeStream(args) - if err != nil { - fmt.Printf("Unable to connect to %v stream. Aborting.", cfg.KinesisStream) - return - } - - for _, shard := range streamInfo.StreamDescription.Shards { - var p = NewPipeline(cfg) - go p.ProcessShard(ksis, shard.ShardId) - } - - select {} -} -``` - -```go -package models - -// Implements Model interface -type User struct { - ID int `json:"userid"` - Username string `json:"username"` - Firstname string `json:"firstname"` - Lastname string `json:"lastname"` - City string `json:"city"` - State string `json:"state"` - Email string `json:"email"` - Phone string `json:"phone"` - Likesports bool `json:"likesports"` - Liketheatre bool `json:"liketheatre"` - Likeconcerts bool `json:"likeconcerts"` - Likejazz bool `json:"likejazz"` - Likeclassical bool `json:"likeclassical"` - Likeopera bool `json:"likeopera"` - Likerock bool `json:"likerock"` - Likevegas bool `json:"likevegas"` - Likebroadway bool `json:"likebroadway"` - Likemusicals bool `json:"likemusicals"` -} - -func (u User) ToString() string { - s := []string{ - strconv.Itoa(u.ID), - u.Username, - u.Firstname, - u.Lastname, - u.City, - u.State, - u.Email, - u.Phone, - strconv.FormatBool(u.Likesports), - strconv.FormatBool(u.Liketheatre), - strconv.FormatBool(u.Likeconcerts), - strconv.FormatBool(u.Likejazz), - strconv.FormatBool(u.Likeclassical), - strconv.FormatBool(u.Likeopera), - strconv.FormatBool(u.Likerock), - strconv.FormatBool(u.Likevegas), - strconv.FormatBool(u.Likebroadway), - strconv.FormatBool(u.Likemusicals), - "\n", + if err != nil { + fmt.Printf("Unable to connect to %v stream. Aborting.", cfg.KinesisStream) + return } - return strings.Join(s, "|") + for _, shard := range streamInfo.StreamDescription.Shards { + var p = NewPipeline(cfg) + go p.ProcessShard(ksis, shard.ShardId) + } + + select {} } ``` -```go -package transformers - -// Implements Transformer interface -type UserTransformer struct {} - -func (t *UserTransformer) ToModel(data []byte) connector.Model { - user := &models.User{} - json.Unmarshal(data, &user) - return user -} -``` - -```sql -CREATE TABLE users ( - id INTEGER, - username VARCHAR(255), - first_name VARCHAR(255), - last_name VARCHAR(255), - city VARCHAR(255), - state VARCHAR(255), - email VARCHAR(255), - phone VARCHAR(255), - like_sports BOOLEAN, - like_theatre BOOLEAN, - like_concerts BOOLEAN, - like_jazz BOOLEAN, - like_classical BOOLEAN, - like_opera BOOLEAN, - like_rock BOOLEAN, - like_vegas BOOLEAN, - like_broadway BOOLEAN, - like_musicals BOOLEAN, - PRIMARY KEY(id) -) -DISTSTYLE KEY -DISTKEY(id) -SORTKEY(id) -``` - [1]: http://godoc.org/github.com/harlow/kinesis-connectors [2]: http://aws.amazon.com/kinesis/ [3]: https://github.com/awslabs/amazon-kinesis-connectors