From 25e390e8b48891a9b86c7a63d7185e024996e39a Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Sat, 23 May 2015 15:22:58 -0700 Subject: [PATCH] Add sample pipeline * Reduce the size of main README in favor of adding functioning examples * Add S3 Pipeline exmaple * Add example of seeing the stream --- .gitignore | 3 + README.md | 97 +------------------------------ examples/s3-pipeline/README.md | 25 ++++++++ examples/s3-pipeline/main.go | 82 ++++++++++++++++++++++++++ examples/s3-pipeline/pipeline.cfg | 8 +++ examples/seed-stream/README.md | 19 ++++++ examples/seed-stream/main.go | 54 +++++++++++++++++ 7 files changed, 192 insertions(+), 96 deletions(-) create mode 100644 examples/s3-pipeline/README.md create mode 100644 examples/s3-pipeline/main.go create mode 100644 examples/s3-pipeline/pipeline.cfg create mode 100644 examples/seed-stream/README.md create mode 100644 examples/seed-stream/main.go diff --git a/.gitignore b/.gitignore index 9e33c63..9e98c21 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ *.a *.so +# Environment vars +.env + # Folders _obj _test diff --git a/README.md b/README.md index 08ed573..2f05325 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,6 @@ A connector pipeline uses the following interfaces: * __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination. * __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream. -## Usage - ### Installation Get the package source: @@ -43,103 +41,10 @@ The customer logger must implement the [Logger interface][log_interface]. [log_interface]: https://github.com/harlow/kinesis-connectors/blob/master/logger.go -### Example Pipeline - -The S3 Connector Pipeline performs the following steps: - -1. Pull records from Kinesis and buffer them untill the desired threshold is met. -2. Upload the batch of records to an S3 bucket. -3. Set the current Shard checkpoint in Redis. - -The config vars are loaded done with [gcfg]. - -[gcfg]: https://code.google.com/p/gcfg/ - -```go -package main - -import ( - "fmt" - "os" - - "code.google.com/p/gcfg" - "github.com/harlow/kinesis-connectors" - "github.com/sendgridlabs/go-kinesis" -) - -type Config struct { - Pipeline struct { - Name string - } - Kinesis struct { - BufferSize int - ShardCount int - StreamName string - } - S3 struct { - BucketName string - } -} - -func newS3Pipeline(cfg Config) *connector.Pipeline { - f := &connector.AllPassFilter{} - b := &connector.RecordBuffer{ - NumRecordsToBuffer: cfg.Kinesis.BufferSize, - } - t := &connector.StringToStringTransformer{} - c := &connector.RedisCheckpoint{ - AppName: cfg.Pipeline.Name, - StreamName: cfg.Kinesis.StreamName, - } - e := &connector.S3Emitter{ - S3Bucket: cfg.S3.BucketName, - } - return &connector.Pipeline{ - Buffer: b, - Checkpoint: c, - Emitter: e, - Filter: f, - StreamName: cfg.Kinesis.StreamName, - Transformer: t, - } -} - -func main() { - // Load config vars - var cfg Config - err := gcfg.ReadFileInto(&cfg, "pipeline.cfg") - - // Set up kinesis client and stream - accessKey := os.Getenv("AWS_ACCESS_KEY") - secretKey := os.Getenv("AWS_SECRET_KEY") - ksis := kinesis.New(accessKey, secretKey, kinesis.Region{}) - connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) - - // Fetch stream info - args := kinesis.NewArgs() - args.Add("StreamName", cfg.Kinesis.StreamName) - streamInfo, err := ksis.DescribeStream(args) - if err != nil { - fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) - return - } - - // Process kinesis shards - for _, shard := range streamInfo.StreamDescription.Shards { - fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) - p := newS3Pipeline(cfg) - go p.ProcessShard(ksis, shard.ShardId) - } - - // Keep alive - <-make(chan int) -} -``` ## Contributing -Please see [CONTRIBUTING.md]. -Thank you, [contributors]! +Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]! [LICENSE]: /MIT-LICENSE [CONTRIBUTING.md]: /CONTRIBUTING.md diff --git a/examples/s3-pipeline/README.md b/examples/s3-pipeline/README.md new file mode 100644 index 0000000..eac7cea --- /dev/null +++ b/examples/s3-pipeline/README.md @@ -0,0 +1,25 @@ +# S3 Pipeline + +The S3 Connector Pipeline performs the following steps: + +1. Pull records from Kinesis and buffer them untill the desired threshold is met. +2. Upload the batch of records to an S3 bucket. +3. Set the current Shard checkpoint in Redis. + +The pipleline config vars are loaded done with [gcfg]. + +[gcfg]: https://code.google.com/p/gcfg/ + +### Environment Variables + +Export the required environment vars for connecting to the Kinesis stream: + +``` +export AWS_ACCESS_KEY= +export AWS_REGION_NAME= +export AWS_SECRET_KEY= +``` + +### Running the code + + $ go run main.go diff --git a/examples/s3-pipeline/main.go b/examples/s3-pipeline/main.go new file mode 100644 index 0000000..a6dbaa3 --- /dev/null +++ b/examples/s3-pipeline/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + + "github.com/harlow/kinesis-connectors" + "github.com/sendgridlabs/go-kinesis" +) + +type Config struct { + Pipeline struct { + Name string + } + Kinesis struct { + BufferSize int + ShardCount int + StreamName string + } + S3 struct { + BucketName string + } +} + +func newS3Pipeline(cfg Config) *connector.Pipeline { + f := &connector.AllPassFilter{} + b := &connector.RecordBuffer{ + NumRecordsToBuffer: cfg.Kinesis.BufferSize, + } + t := &connector.StringToStringTransformer{} + c := &connector.RedisCheckpoint{ + AppName: cfg.Pipeline.Name, + StreamName: cfg.Kinesis.StreamName, + } + e := &connector.S3Emitter{ + S3Bucket: cfg.S3.BucketName, + } + return &connector.Pipeline{ + Buffer: b, + Checkpoint: c, + Emitter: e, + Filter: f, + StreamName: cfg.Kinesis.StreamName, + Transformer: t, + } +} + +func main() { + var cfg Config + var err error + + // Load config vars + err = gcfg.ReadFileInto(&cfg, "pipeline.cfg") + if err != nil { + fmt.Printf("Config ERROR: %v\n", err) + } + + // Initialize Kinesis client + auth := kinesis.NewAuth() + ksis := kinesis.New(&auth, kinesis.Region{}) + + // Create stream + connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount) + + // Fetch stream info + args := kinesis.NewArgs() + args.Add("StreamName", cfg.Kinesis.StreamName) + streamInfo, err := ksis.DescribeStream(args) + if err != nil { + fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) + return + } + + // Process kinesis shards + for _, shard := range streamInfo.StreamDescription.Shards { + fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) + p := newS3Pipeline(cfg) + go p.ProcessShard(ksis, shard.ShardId) + } + + // Keep alive + <-make(chan int) +} diff --git a/examples/s3-pipeline/pipeline.cfg b/examples/s3-pipeline/pipeline.cfg new file mode 100644 index 0000000..3108849 --- /dev/null +++ b/examples/s3-pipeline/pipeline.cfg @@ -0,0 +1,8 @@ +[pipeline] + name = s3Pipeline +[s3] + bucketName = kinesis.test +[kinesis] + bufferSize = 100 + shardCount = 2 + streamName = userStream diff --git a/examples/seed-stream/README.md b/examples/seed-stream/README.md new file mode 100644 index 0000000..07bf46d --- /dev/null +++ b/examples/seed-stream/README.md @@ -0,0 +1,19 @@ +# Seed the Stream + +A prepopulated file with JSON users is available on S3 for seeing the stream: + +https://s3.amazonaws.com/kinesis.test/users.txt + +### Environment Variables + +Export the required environment vars for connecting to the Kinesis stream: + +``` +export AWS_ACCESS_KEY= +export AWS_REGION_NAME= +export AWS_SECRET_KEY= +``` + +### Running the code + + $ go run main.go diff --git a/examples/seed-stream/main.go b/examples/seed-stream/main.go new file mode 100644 index 0000000..aebec28 --- /dev/null +++ b/examples/seed-stream/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "bufio" + "fmt" + "os" + "sync" + + "github.com/harlow/kinesis-connectors" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" +) + +func main() { + godotenv.Load() + + // Initialize Kinesis client + auth := kinesis.NewAuth() + ksis := kinesis.New(&auth, kinesis.Region{}) + + // Create stream + connector.CreateStream(ksis, "userStream", 2) + + // read file + file, _ := os.Open("users.txt") + defer file.Close() + scanner := bufio.NewScanner(file) + + args := kinesis.NewArgs() + args.Add("StreamName", "userStream") + ctr := 0 + var wg sync.WaitGroup + + for scanner.Scan() { + l := scanner.Text() + ctr = ctr + 1 + key := fmt.Sprintf("partitionKey-%d", ctr) + + args := kinesis.NewArgs() + args.Add("StreamName", "userStream") + args.AddRecord([]byte(l), key) + wg.Add(1) + + go func() { + ksis.PutRecords(args) + fmt.Print(".") + wg.Done() + }() + } + + wg.Wait() + fmt.Println(".") + fmt.Println("Finished populating userStream") +}