Add Redshift Pipeline example

This commit is contained in:
Harlow Ward 2015-05-25 23:18:14 -07:00
parent e17988cfc9
commit 3d9e6e2485
3 changed files with 104 additions and 5 deletions

View file

@ -30,15 +30,14 @@ Get the package source:
### Example Pipelines
Examples pipelines are proviede in [examples directory][example].
Examples pipelines:
[example]: https://github.com/harlow/kinesis-connectors/tree/master/examples
* [S3 Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3-pipeline)
* [Redshift Basic Pipeline](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift-pipeline)
### Logging
Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log).
Applications can override the default loging behaviour by implementing the [Logger interface][log_interface].
Default logging is handled by [go-kit package log](https://github.com/go-kit/kit/tree/master/log). Applications can override the default loging behaviour by implementing the [Logger interface][log_interface].
```go
connector.SetLogger(NewCustomLogger())

View file

@ -0,0 +1,88 @@
package main
import (
"fmt"
"github.com/harlow/kinesis-connectors"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
)
type Config struct {
Pipeline struct {
Name string
}
Kinesis struct {
BufferSize int
ShardCount int
StreamName string
}
S3 struct {
BucketName string
}
Redshift struct {
Delimiter string
Format string
Jsonpaths string
}
}
func NewPipeline(cfg Config) *connector.Pipeline {
f := &connector.AllPassFilter{}
t := &connector.StringToStringTransformer{}
c := &connector.RedisCheckpoint{
AppName: cfg.AppName,
StreamName: cfg.KinesisStream,
}
e := &connector.RedshiftEmitter{
TableName: cfg.TableName,
S3Bucket: cfg.S3Bucket,
Format: cfg.Format,
}
return &connector.Pipeline{
Buffer: b,
Checkpoint: c,
Emitter: e,
Filter: f,
NumRecordsToBuffer: cfg.NumRecordsToBuffer,
StreamName: cfg.KinesisStream,
Transformer: t,
}
}
func main() {
var cfg Config
var err error
// Load config vars
err = gcfg.ReadFileInto(&cfg, "pipeline.cfg")
if err != nil {
fmt.Printf("Config ERROR: %v\n", err)
}
// Initialize Kinesis client
auth := kinesis.NewAuth()
ksis := kinesis.New(&auth, kinesis.Region{})
// Create stream
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
// Fetch stream info
args := kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.StreamName)
streamInfo, err := ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
return
}
// Process kinesis shards
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
<-make(chan int)
}

View file

@ -0,0 +1,12 @@
[pipeline]
name = s3Pipeline
[s3]
bucketName = kinesis.test
[kinesis]
bufferSize = 100
shardCount = 2
streamName = userStream
[redshift]
tableName = kinesis_pipeline_test
format = json
copyMandatory = true