From 3d9e6e24857dd1fdcc5e08f6bdbec6054e18acce Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Mon, 25 May 2015 23:18:14 -0700 Subject: [PATCH] Add Redshift Pipeline example --- README.md | 9 ++- examples/redshift-pipeline/main.go | 88 +++++++++++++++++++++++++ examples/redshift-pipeline/pipeline.cfg | 12 ++++ 3 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 examples/redshift-pipeline/main.go create mode 100644 examples/redshift-pipeline/pipeline.cfg diff --git a/README.md b/README.md index add724b..f1308e7 100644 --- a/README.md +++ b/README.md @@ -30,15 +30,14 @@ Get the package source: ### Example Pipelines -Examples pipelines are proviede in [examples directory][example]. +Examples pipelines: -[example]: https://github.com/harlow/kinesis-connectors/tree/master/examples +* [S3 Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3-pipeline) +* [Redshift Basic Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift-pipeline) ### Logging -Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). - -Applications can override the default loging behaviour by implementing the [Logger interface][log_interface]. +Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). Applications can override the default loging behaviour by implementing the [Logger interface][log_interface]. ```go connector.SetLogger(NewCustomLogger()) diff --git a/examples/redshift-pipeline/main.go b/examples/redshift-pipeline/main.go new file mode 100644 index 0000000..94dd4a3 --- /dev/null +++ b/examples/redshift-pipeline/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + + "github.com/harlow/kinesis-connectors" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" +) + +type Config struct { + Pipeline struct { + Name string + } + Kinesis struct { + BufferSize int + ShardCount int + StreamName string + } + S3 struct { + BucketName string + } + Redshift struct { + Delimiter string + Format string + Jsonpaths string + } +} + +func NewPipeline(cfg Config) *connector.Pipeline { + f := &connector.AllPassFilter{} + t := &connector.StringToStringTransformer{} + c := &connector.RedisCheckpoint{ + AppName: cfg.AppName, + StreamName: cfg.KinesisStream, + } + e := &connector.RedshiftEmitter{ + TableName: cfg.TableName, + S3Bucket: cfg.S3Bucket, + Format: cfg.Format, + } + return &connector.Pipeline{ + Buffer: b, + Checkpoint: c, + Emitter: e, + Filter: f, + NumRecordsToBuffer: cfg.NumRecordsToBuffer, + StreamName: cfg.KinesisStream, + Transformer: t, + } +} + +func main() { + var cfg Config + var err error + + // Load config vars + err = gcfg.ReadFileInto(&cfg, "pipeline.cfg") + if err != nil { + fmt.Printf("Config ERROR: %v\n", err) + } + + // Initialize Kinesis client + auth := kinesis.NewAuth() + ksis := kinesis.New(&auth, kinesis.Region{}) + + // Create stream + connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) + + // Fetch stream info + args := kinesis.NewArgs() + args.Add("StreamName", cfg.Kinesis.StreamName) + streamInfo, err := ksis.DescribeStream(args) + if err != nil { + fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) + return + } + + // Process kinesis shards + for _, shard := range streamInfo.StreamDescription.Shards { + fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) + p := newS3Pipeline(cfg) + go p.ProcessShard(ksis, shard.ShardId) + } + + // Keep alive + <-make(chan int) +} diff --git a/examples/redshift-pipeline/pipeline.cfg b/examples/redshift-pipeline/pipeline.cfg new file mode 100644 index 0000000..3634a09 --- /dev/null +++ b/examples/redshift-pipeline/pipeline.cfg @@ -0,0 +1,12 @@ +[pipeline] + name = s3Pipeline +[s3] + bucketName = kinesis.test +[kinesis] + bufferSize = 100 + shardCount = 2 + streamName = userStream +[redshift] + tableName = kinesis_pipeline_test + format = json + copyMandatory = true