diff --git a/README.md b/README.md index 0aa60fe..c461443 100644 --- a/README.md +++ b/README.md @@ -27,12 +27,13 @@ Install the library: $ go get github.com/harlow/kinesis-connectors -### Example Redshift Manifest Pipeline +### Example S3 Pipeline -The Redshift Manifest Pipeline works in several steps: +The S3 Pipeline performs the following steps: -1. Pull records from Kinesis and buffer them untill the desired threshold is reached. The S3 Manifest Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and put the file path onto the manifest stream. -2. Pull S3 path records from Kinesis and batch into a Manifest file. Upload the manifest to S3 and issue the COPY command to Redshift. +1. Pull records from Kinesis and buffer them untill the desired threshold is reached. +2. Upload the records to an S3 bucket. +3. Set the current Shard checkpoint in Redis. The config vars are loaded done with [gcfg][3]. @@ -41,6 +42,8 @@ package main import ( "fmt" + "log" + "net/http" "os" "code.google.com/p/gcfg" @@ -52,28 +55,44 @@ type Config struct { Pipeline struct { Name string } - Redshift struct { - CopyMandatory bool - DataTable string - FileTable string - Format string - } Kinesis struct { - InputBufferSize int - InputShardCount int - InputStream string - OutputBufferSize int - OutputShardCount int - OutputStream string + BufferSize int + ShardCount int + StreamName string } S3 struct { BucketName string } } +func newS3Pipeline(cfg Config) *connector.Pipeline { + f := &connector.AllPassFilter{} + b := &connector.RecordBuffer{ + NumRecordsToBuffer: cfg.Kinesis.BufferSize, + } + t := &connector.StringToStringTransformer{} + c := &connector.RedisCheckpoint{ + AppName: cfg.Pipeline.Name, + StreamName: cfg.Kinesis.StreamName, + } + e := &connector.S3ManifestEmitter{ + S3Bucket: cfg.S3.BucketName, + } + return &connector.Pipeline{ + Buffer: b, + Checkpoint: c, + Emitter: e, + Filter: f, + Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), + StreamName: cfg.Kinesis.StreamName, + Transformer: t, + } +} + func main() { + // Load config vars var cfg Config - err := gcfg.ReadFileInto(&cfg, "config.cfg") + err := gcfg.ReadFileInto(&cfg, "pipeline.cfg") // Set up kinesis client accessKey := os.Getenv("AWS_ACCESS_KEY") @@ -81,74 +100,22 @@ func main() { ksis := kinesis.New(accessKey, secretKey, kinesis.Region{}) // Create and wait for streams - connector.CreateStream(ksis, cfg.Kinesis.InputStream, cfg.Kinesis.InputShardCount) - connector.CreateStream(ksis, cfg.Kinesis.OutputStream, cfg.Kinesis.OutputShardCount) + connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) - // Process mobile event stream + // Fetch stream info args := kinesis.NewArgs() - args.Add("StreamName", cfg.Kinesis.InputStream) + args.Add("StreamName", cfg.Kinesis.StreamName) streamInfo, err := ksis.DescribeStream(args) if err != nil { - fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream) + fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) return } + // Process kinesis shards for _, shard := range streamInfo.StreamDescription.Shards { - fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.InputStream) - f := connector.AllPassFilter{} - b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.InputBufferSize} - t := connector.StringToStringTransformer{} - c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.InputStream} - e := connector.S3ManifestEmitter{ - OutputStream: cfg.Kinesis.OutputStream, - S3Bucket: cfg.S3.BucketName, - Ksis: ksis, - } - p := &connector.Pipeline{ - Buffer: &b, - Checkpoint: &c, - Emitter: &e, - Filter: &f, - Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), - StreamName: cfg.Kinesis.InputStream, - Transformer: &t, - } - go p.ProcessShard(ksis, shard.ShardId) - } - - // Process manifest stream - args = kinesis.NewArgs() - args.Add("StreamName", cfg.Kinesis.OutputStream) - streamInfo, err = ksis.DescribeStream(args) - - if err != nil { - fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.OutputStream) - return - } - - for _, shard := range streamInfo.StreamDescription.Shards { - fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.OutputStream) - f := connector.AllPassFilter{} - b := connector.RecordBuffer{NumRecordsToBuffer: cfg.Kinesis.OutputBufferSize} - t := connector.StringToStringTransformer{} - c := connector.RedisCheckpoint{AppName: cfg.Pipeline.Name, StreamName: cfg.Kinesis.OutputStream} - e := connector.RedshiftManifestEmitter{ - CopyMandatory: cfg.Redshift.CopyMandatory, - DataTable: cfg.Redshift.DataTable, - FileTable: cfg.Redshift.FileTable, - Format: cfg.Redshift.Format, - S3Bucket: cfg.S3.BucketName, - } - p := &connector.Pipeline{ - Buffer: &b, - Checkpoint: &c, - Emitter: &e, - Filter: &f, - Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), - StreamName: cfg.Kinesis.OutputStream, - Transformer: &t, - } + fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) + p := newS3Pipeline(cfg) go p.ProcessShard(ksis, shard.ShardId) }