2014-07-25 06:03:41 +00:00
# Golang Kinesis Connectors
2014-07-25 06:03:41 +00:00
2015-05-23 20:56:03 +00:00
#### Kinesis connector applications written in Go
2014-07-25 06:03:41 +00:00
2014-12-19 02:26:59 +00:00
Inspired by the [Amazon Kinesis Connector Library][1]. These components are used for extracting streaming event data
2014-11-16 02:10:19 +00:00
into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation.
2014-07-25 06:03:41 +00:00
2015-05-23 15:52:08 +00:00
[1]: https://github.com/awslabs/amazon-kinesis-connectors
[2]: http://godoc.org/github.com/harlow/kinesis-connectors
2014-07-25 06:03:41 +00:00
## Overview
2014-07-25 06:03:41 +00:00
2014-07-25 06:03:41 +00:00
Each Amazon Kinesis connector application is a pipeline that determines how records from an Amazon Kinesis stream will be handled. Records are retrieved from the stream, transformed according to a user-defined data model, buffered for batch processing, and then emitted to the appropriate AWS service.
2014-07-25 06:03:41 +00:00
2014-07-25 06:03:41 +00:00

A connector pipeline uses the following interfaces:
* __Pipeline:__ The pipeline implementation itself.
* __Transformer:__ Defines the transformation of records from the Amazon Kinesis stream in order to suit the user-defined data model. Includes methods for custom serializer/deserializers.
* __Filter:__ Defines a method for excluding irrelevant records from the processing.
* __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination.
* __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream.
## Usage
2015-05-23 20:57:52 +00:00
### Installation
Get the package source:
2014-07-25 06:03:41 +00:00
$ go get github.com/harlow/kinesis-connectors
2015-05-23 18:40:04 +00:00
### Logging
2014-07-25 06:03:41 +00:00
2015-05-23 18:40:04 +00:00
Default logging is handled by Package log. An application can override the defualt package logging by
changing it's `logger` variable:
2014-07-25 06:03:41 +00:00
2015-05-23 18:40:04 +00:00
```go
connector.SetLogger(NewCustomLogger())
```
The customer logger must implement the [Logger interface][log_interface].
[log_interface]: https://github.com/harlow/kinesis-connectors/blob/master/logger.go
### Example Pipeline
The S3 Connector Pipeline performs the following steps:
1. Pull records from Kinesis and buffer them untill the desired threshold is met.
2. Upload the batch of records to an S3 bucket.
2015-05-03 23:00:27 +00:00
3. Set the current Shard checkpoint in Redis.
2014-11-16 01:32:41 +00:00
2015-05-23 15:52:08 +00:00
The config vars are loaded done with [gcfg].
[gcfg]: https://code.google.com/p/gcfg/
2014-11-16 01:32:41 +00:00
2014-07-25 06:03:41 +00:00
```go
package main
import (
2014-12-21 03:45:34 +00:00
"fmt"
"os"
2014-07-25 06:03:41 +00:00
2014-12-21 03:45:34 +00:00
"code.google.com/p/gcfg"
"github.com/harlow/kinesis-connectors"
"github.com/sendgridlabs/go-kinesis"
2014-07-25 06:03:41 +00:00
)
type Config struct {
2014-12-21 03:45:34 +00:00
Pipeline struct {
Name string
}
Kinesis struct {
2015-05-03 23:00:27 +00:00
BufferSize int
ShardCount int
StreamName string
2014-12-21 03:45:34 +00:00
}
S3 struct {
BucketName string
}
2014-07-25 06:03:41 +00:00
}
2015-05-03 23:00:27 +00:00
func newS3Pipeline(cfg Config) *connector.Pipeline {
f := & connector.AllPassFilter{}
b := & connector.RecordBuffer{
NumRecordsToBuffer: cfg.Kinesis.BufferSize,
}
t := & connector.StringToStringTransformer{}
c := & connector.RedisCheckpoint{
AppName: cfg.Pipeline.Name,
StreamName: cfg.Kinesis.StreamName,
}
2015-05-23 06:38:06 +00:00
e := & connector.S3Emitter{
2015-05-03 23:00:27 +00:00
S3Bucket: cfg.S3.BucketName,
}
return & connector.Pipeline{
Buffer: b,
Checkpoint: c,
Emitter: e,
Filter: f,
StreamName: cfg.Kinesis.StreamName,
Transformer: t,
}
}
2014-11-16 01:32:41 +00:00
func main() {
2015-05-03 23:00:27 +00:00
// Load config vars
2014-12-21 03:45:34 +00:00
var cfg Config
2015-05-03 23:00:27 +00:00
err := gcfg.ReadFileInto(& cfg, "pipeline.cfg")
2014-12-21 03:45:34 +00:00
2015-05-23 06:38:06 +00:00
// Set up kinesis client and stream
2014-12-21 03:45:34 +00:00
accessKey := os.Getenv("AWS_ACCESS_KEY")
secretKey := os.Getenv("AWS_SECRET_KEY")
ksis := kinesis.New(accessKey, secretKey, kinesis.Region{})
2015-05-03 23:00:27 +00:00
connector.CreateStream(ksis, cfg.Kinesis.StreamName, cfg.Kinesis.ShardCount)
2014-12-21 03:45:34 +00:00
2015-05-03 23:00:27 +00:00
// Fetch stream info
2014-12-21 03:45:34 +00:00
args := kinesis.NewArgs()
2015-05-03 23:00:27 +00:00
args.Add("StreamName", cfg.Kinesis.StreamName)
2014-12-21 03:45:34 +00:00
streamInfo, err := ksis.DescribeStream(args)
if err != nil {
2015-05-03 23:00:27 +00:00
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
2014-12-21 03:45:34 +00:00
return
}
2015-05-03 23:00:27 +00:00
// Process kinesis shards
2014-12-21 03:45:34 +00:00
for _, shard := range streamInfo.StreamDescription.Shards {
2015-05-03 23:00:27 +00:00
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
2014-12-21 03:45:34 +00:00
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
< -make ( chan int )
}
2014-07-25 06:03:41 +00:00
```
2015-05-23 15:52:08 +00:00
## Contributing
Please see [CONTRIBUTING.md].
Thank you, [contributors]!
[LICENSE]: /MIT-LICENSE
[CONTRIBUTING.md]: /CONTRIBUTING.md
## License
Copyright (c) 2015 Harlow Ward. It is free software, and may
be redistributed under the terms specified in the [LICENSE] file.
[contributors]: https://github.com/harlow/kinesis-connectors/graphs/contributors