Move emitters to separate packages

This commit is contained in:
Harlow Ward 2016-02-08 19:39:09 -08:00
parent caa2fc0b0a
commit aae268108e
15 changed files with 53 additions and 227 deletions

View file

@ -2,12 +2,9 @@
__Kinesis connector applications written in Go__ __Kinesis connector applications written in Go__
_Note: Repo is going under refactoring to use a handler func to process batch data. The previous stable version of connectors exist at SHA `509f68de89efb74aa8d79a733749208edaf56b4d`_ > With the release of Kinesis Firehose I'd recommend using [Lambda Streams to Firehose](https://github.com/awslabs/lambda-streams-to-firehose) for loading data to S3 and Redshift.
Inspired by the [Amazon Kinesis Connector Library][1]. This library is used for extracting streaming event data from Kinesis into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation. Inspired by the [Amazon Kinesis Connector Library](https://github.com/awslabs/amazon-kinesis-connectors). This library is intended to be a lightweight wrapper around the Kinesis API to handle batching records, respecing ratelimits, setting checkpoints, and recovering gracefully from network errors.
[1]: https://github.com/awslabs/amazon-kinesis-connectors
[2]: http://godoc.org/github.com/harlow/kinesis-connectors
![golang_kinesis_connector](https://cloud.githubusercontent.com/assets/739782/4262283/2ee2550e-3b97-11e4-8cd1-21a5d7ee0964.png) ![golang_kinesis_connector](https://cloud.githubusercontent.com/assets/739782/4262283/2ee2550e-3b97-11e4-8cd1-21a5d7ee0964.png)
@ -39,12 +36,12 @@ Get the package source:
$ go get github.com/harlow/kinesis-connectors $ go get github.com/harlow/kinesis-connectors
### Example Pipelines ### Examples
Examples pipelines: Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/examples/seed) code to put sample data onto the stream.
* [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose)
* [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3) * [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3)
* [Redshift](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift)
### Logging ### Logging

View file

@ -29,7 +29,7 @@ type RedshiftEmitter struct {
// then issues a copy command to Redshift data store. // then issues a copy command to Redshift data store.
func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) { func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) {
// put contents to S3 Bucket // put contents to S3 Bucket
s3 := &S3Emitter{Bucket: e.S3Bucket} s3 := &Emitter{Bucket: e.S3Bucket}
s3.Emit(s3Key, b) s3.Emit(s3Key, b)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {

View file

@ -1,4 +1,4 @@
package connector package redshift
type Entry struct { type Entry struct {
Url string `json:"url"` Url string `json:"url"`

View file

@ -1,73 +0,0 @@
package connector
import (
"bytes"
"database/sql"
"fmt"
// Postgres package is used when sql.Open is called
_ "github.com/lib/pq"
)
// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one.
// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered
// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct.
type RedshiftEmitter struct {
AwsAccessKey string
AwsSecretAccessKey string
Delimiter string
Format string
Jsonpaths string
S3Bucket string
S3Prefix string
TableName string
Db *sql.DB
}
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
// then issues a copy command to Redshift data store.
func (e RedshiftEmitter) Emit(b Buffer) {
s3Emitter := S3Emitter{Bucket: e.S3Bucket}
s3Emitter.Emit(b, t)
s3File := s3Emitter.S3FileName(b.FirstSeq(), b.LastSeq())
for i := 0; i < 10; i++ {
// execute copy statement
_, err := e.Db.Exec(e.copyStatement(s3File))
// db command succeeded, break from loop
if err == nil {
logger.Log("info", "RedshiftEmitter", "file", s3File)
break
}
// handle recoverable errors, else break from loop
if isRecoverableError(err) {
handleAwsWaitTimeExp(i)
} else {
logger.Log("error", "RedshiftEmitter", "msg", err.Error())
break
}
}
}
// Creates the SQL copy statement issued to Redshift cluster.
func (e RedshiftEmitter) copyStatement(s3File string) string {
b := new(bytes.Buffer)
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey))
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey))
switch e.Format {
case "json":
b.WriteString(fmt.Sprintf("json 'auto'"))
case "jsonpaths":
b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths))
default:
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
}
b.WriteString(";")
return b.String()
}

View file

@ -1,4 +1,4 @@
package connector package redshift
import ( import (
"bytes" "bytes"

View file

@ -1,4 +1,4 @@
package connector package redshiftemitter
import "testing" import "testing"

View file

@ -1,29 +1,29 @@
package connector package s3
import ( import (
"io" "io"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" awss3 "github.com/aws/aws-sdk-go/service/s3"
"gopkg.in/matryer/try.v1" "gopkg.in/matryer/try.v1"
) )
// S3Emitter stores data in S3 bucket. // Emitter stores data in S3 bucket.
// //
// The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this // The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this
// struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated // struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated
// from the first and last sequence numbers of the records contained in that file separated by a // from the first and last sequence numbers of the records contained in that file separated by a
// dash. This struct requires the configuration of an S3 bucket and endpoint. // dash. This struct requires the configuration of an S3 bucket and endpoint.
type S3Emitter struct { type Emitter struct {
Bucket string Bucket string
} }
// Emit is invoked when the buffer is full. This method emits the set of filtered records. // Emit is invoked when the buffer is full. This method emits the set of filtered records.
func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) { func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error {
svc := s3.New(session.New()) svc := awss3.New(session.New())
params := &s3.PutObjectInput{
params := &awss3.PutObjectInput{
Body: b, Body: b,
Bucket: aws.String(e.Bucket), Bucket: aws.String(e.Bucket),
ContentType: aws.String("text/plain"), ContentType: aws.String("text/plain"),
@ -37,10 +37,8 @@ func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) {
}) })
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { return err
logger.Log("error", "s3.PutObject", "code", awsErr.Code())
}
} }
logger.Log("info", "S3Emitter", "msg", "success", "key", s3Key) return nil
} }

View file

@ -1,11 +1,11 @@
package connector package s3
import ( import (
"fmt" "fmt"
"time" "time"
) )
func S3Key(prefix, firstSeq, lastSeq string) string { func Key(prefix, firstSeq, lastSeq string) string {
date := time.Now().UTC().Format("2006/01/02") date := time.Now().UTC().Format("2006/01/02")
if prefix == "" { if prefix == "" {

View file

@ -1,4 +1,4 @@
package connector package s3
import ( import (
"fmt" "fmt"
@ -8,12 +8,12 @@ import (
"github.com/bmizerany/assert" "github.com/bmizerany/assert"
) )
func Test_S3Key(t *testing.T) { func Test_Key(t *testing.T) {
d := time.Now().UTC().Format("2006/01/02") d := time.Now().UTC().Format("2006/01/02")
k := S3Key("", "a", "b") k := Key("", "a", "b")
assert.Equal(t, k, fmt.Sprintf("%v/a-b", d)) assert.Equal(t, k, fmt.Sprintf("%v/a-b", d))
k = S3Key("prefix", "a", "b") k = Key("prefix", "a", "b")
assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d)) assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d))
} }

View file

@ -1,8 +1,7 @@
package connector package s3
import ( import (
"io" "io"
"os"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -11,15 +10,15 @@ import (
// An implementation of Emitter that puts event data on S3 file, and then puts the // An implementation of Emitter that puts event data on S3 file, and then puts the
// S3 file path onto the output stream for processing by manifest application. // S3 file path onto the output stream for processing by manifest application.
type S3ManifestEmitter struct { type ManifestEmitter struct {
OutputStream string OutputStream string
Bucket string Bucket string
Prefix string Prefix string
} }
func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) { func (e ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) error {
// put contents to S3 Bucket // put contents to S3 Bucket
s3 := &S3Emitter{Bucket: e.Bucket} s3 := &Emitter{Bucket: e.Bucket}
s3.Emit(s3Key, b) s3.Emit(s3Key, b)
// put file path on Kinesis output stream // put file path on Kinesis output stream
@ -33,9 +32,8 @@ func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) {
_, err := svc.PutRecord(params) _, err := svc.PutRecord(params)
if err != nil { if err != nil {
logger.Log("error", "PutRecord", "msg", err) return err
os.Exit(1)
} else {
logger.Log("info", "S3ManifestEmitter", "stream", e.OutputStream, "key", s3Key)
} }
return nil
} }

View file

@ -1,85 +0,0 @@
package main
import (
"fmt"
"github.com/harlow/kinesis-connectors"
"github.com/joho/godotenv"
"github.com/sendgridlabs/go-kinesis"
)
type Config struct {
Pipeline struct {
Name string
}
Kinesis struct {
BufferSize int
ShardCount int
StreamName string
}
S3 struct {
BucketName string
}
Redshift struct {
Delimiter string
Format string
Jsonpaths string
}
}
func NewPipeline(cfg Config) *connector.Pipeline {
f := &connector.AllPassFilter{}
t := &connector.StringToStringTransformer{}
c := &connector.RedisCheckpoint{
AppName: cfg.AppName,
StreamName: cfg.KinesisStream,
}
e := &connector.RedshiftEmitter{
TableName: cfg.TableName,
S3Bucket: cfg.S3Bucket,
Format: cfg.Format,
}
return &connector.Pipeline{
Buffer: b,
Checkpoint: c,
Emitter: e,
Filter: f,
NumRecordsToBuffer: cfg.NumRecordsToBuffer,
StreamName: cfg.KinesisStream,
Transformer: t,
}
}
func main() {
var cfg Config
var err error
// Load config vars
err = gcfg.ReadFileInto(&cfg, "pipeline.cfg")
if err != nil {
fmt.Printf("Config ERROR: %v\n", err)
}
// Initialize Kinesis client
auth := kinesis.NewAuth()
ksis := kinesis.New(&auth, kinesis.Region{})
// Fetch stream info
args := kinesis.NewArgs()
args.Add("StreamName", cfg.Kinesis.StreamName)
streamInfo, err := ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName)
return
}
// Process kinesis shards
for _, shard := range streamInfo.StreamDescription.Shards {
fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName)
p := newS3Pipeline(cfg)
go p.ProcessShard(ksis, shard.ShardId)
}
// Keep alive
<-make(chan int)
}

View file

@ -1,12 +0,0 @@
[pipeline]
name = s3Pipeline
[s3]
bucketName = kinesis.test
[kinesis]
bufferSize = 100
shardCount = 2
streamName = userStream
[redshift]
tableName = kinesis_pipeline_test
format = json
copyMandatory = true

View file

@ -3,8 +3,10 @@ package main
import ( import (
"bytes" "bytes"
"flag" "flag"
"fmt"
"github.com/harlow/kinesis-connectors" "github.com/harlow/kinesis-connectors"
"github.com/harlow/kinesis-connectors/emitter/s3"
) )
var ( var (
@ -13,26 +15,27 @@ var (
stream = flag.String("s", "", "Stream name") stream = flag.String("s", "", "Stream name")
) )
func handler(b connector.Buffer) {
body := new(bytes.Buffer)
// filter or transform data if needed
for _, r := range b.GetRecords() {
body.Write(r.Data)
}
s3 := &connector.S3Emitter{Bucket: *bucket}
s3.Emit(
connector.S3Key("", b.FirstSeq(), b.LastSeq()),
bytes.NewReader(body.Bytes()),
)
}
func main() { func main() {
flag.Parse() flag.Parse()
emitter := &s3.Emitter{Bucket: *bucket}
c := connector.NewConsumer(*app, *stream) c := connector.NewConsumer(*app, *stream)
c.Start(connector.HandlerFunc(handler)) c.Start(connector.HandlerFunc(func(b connector.Buffer) {
body := new(bytes.Buffer)
for _, r := range b.GetRecords() {
body.Write(r.Data)
}
err := emitter.Emit(
s3.Key("", b.FirstSeq(), b.LastSeq()),
bytes.NewReader(body.Bytes()),
)
if err != nil {
fmt.Printf("error %s", err)
}
}))
select {} // run forever select {} // run forever
} }

View file

@ -1,4 +1,4 @@
# Seed the Stream # Populate the Stream with data
A prepopulated file with JSON users is available on S3 for seeing the stream: A prepopulated file with JSON users is available on S3 for seeing the stream:
@ -17,4 +17,4 @@ export AWS_SECRET_KEY=
### Running the code ### Running the code
$ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt $ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt
$ go run main.go $ go run main.go -s streamName