diff --git a/README.md b/README.md index c71fe3b..18115e9 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,9 @@ __Kinesis connector applications written in Go__ -_Note: Repo is going under refactoring to use a handler func to process batch data. The previous stable version of connectors exist at SHA `509f68de89efb74aa8d79a733749208edaf56b4d`_ +> With the release of Kinesis Firehose I'd recommend using [Lambda Streams to Firehose](https://github.com/awslabs/lambda-streams-to-firehose) for loading data to S3 and Redshift. -Inspired by the [Amazon Kinesis Connector Library][1]. This library is used for extracting streaming event data from Kinesis into S3, Redshift, DynamoDB, and more. See the [API Docs][2] for package documentation. - -[1]: https://github.com/awslabs/amazon-kinesis-connectors -[2]: http://godoc.org/github.com/harlow/kinesis-connectors +Inspired by the [Amazon Kinesis Connector Library](https://github.com/awslabs/amazon-kinesis-connectors). This library is intended to be a lightweight wrapper around the Kinesis API to handle batching records, respecing ratelimits, setting checkpoints, and recovering gracefully from network errors. ![golang_kinesis_connector](https://cloud.githubusercontent.com/assets/739782/4262283/2ee2550e-3b97-11e4-8cd1-21a5d7ee0964.png) @@ -39,12 +36,12 @@ Get the package source: $ go get github.com/harlow/kinesis-connectors -### Example Pipelines +### Examples -Examples pipelines: +Use the [seed stream](https://github.com/harlow/kinesis-connectors/tree/master/examples/seed) code to put sample data onto the stream. +* [Firehose](https://github.com/harlow/kinesis-connectors/tree/master/examples/firehose) * [S3](https://github.com/harlow/kinesis-connectors/tree/master/examples/s3) -* [Redshift](https://github.com/harlow/kinesis-connectors/tree/master/examples/redshift) ### Logging diff --git a/redshift_emitter.go b/emitter/redshift/emitter.go similarity index 98% rename from redshift_emitter.go rename to emitter/redshift/emitter.go index 3db74d9..c3c1e0e 100644 --- a/redshift_emitter.go +++ b/emitter/redshift/emitter.go @@ -29,7 +29,7 @@ type RedshiftEmitter struct { // then issues a copy command to Redshift data store. func (e RedshiftEmitter) Emit(s3Key string, b io.ReadSeeker) { // put contents to S3 Bucket - s3 := &S3Emitter{Bucket: e.S3Bucket} + s3 := &Emitter{Bucket: e.S3Bucket} s3.Emit(s3Key, b) for i := 0; i < 10; i++ { diff --git a/redshift_emitter_test.go b/emitter/redshift/emitter_test.go similarity index 100% rename from redshift_emitter_test.go rename to emitter/redshift/emitter_test.go diff --git a/emitter/redshift/manifest.go b/emitter/redshift/manifest.go index 7e6ad47..23e6e62 100644 --- a/emitter/redshift/manifest.go +++ b/emitter/redshift/manifest.go @@ -1,4 +1,4 @@ -package connector +package redshift type Entry struct { Url string `json:"url"` diff --git a/emitter/redshift/redshift_emitter.go b/emitter/redshift/redshift_emitter.go deleted file mode 100644 index cb21db3..0000000 --- a/emitter/redshift/redshift_emitter.go +++ /dev/null @@ -1,73 +0,0 @@ -package connector - -import ( - "bytes" - "database/sql" - "fmt" - - // Postgres package is used when sql.Open is called - _ "github.com/lib/pq" -) - -// RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. -// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered -// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. -type RedshiftEmitter struct { - AwsAccessKey string - AwsSecretAccessKey string - Delimiter string - Format string - Jsonpaths string - S3Bucket string - S3Prefix string - TableName string - Db *sql.DB -} - -// Emit is invoked when the buffer is full. This method leverages the S3Emitter and -// then issues a copy command to Redshift data store. -func (e RedshiftEmitter) Emit(b Buffer) { - s3Emitter := S3Emitter{Bucket: e.S3Bucket} - s3Emitter.Emit(b, t) - s3File := s3Emitter.S3FileName(b.FirstSeq(), b.LastSeq()) - - for i := 0; i < 10; i++ { - // execute copy statement - _, err := e.Db.Exec(e.copyStatement(s3File)) - - // db command succeeded, break from loop - if err == nil { - logger.Log("info", "RedshiftEmitter", "file", s3File) - break - } - - // handle recoverable errors, else break from loop - if isRecoverableError(err) { - handleAwsWaitTimeExp(i) - } else { - logger.Log("error", "RedshiftEmitter", "msg", err.Error()) - break - } - } -} - -// Creates the SQL copy statement issued to Redshift cluster. -func (e RedshiftEmitter) copyStatement(s3File string) string { - b := new(bytes.Buffer) - b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) - b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) - b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", e.AwsAccessKey)) - b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", e.AwsSecretAccessKey)) - - switch e.Format { - case "json": - b.WriteString(fmt.Sprintf("json 'auto'")) - case "jsonpaths": - b.WriteString(fmt.Sprintf("json '%v'", e.Jsonpaths)) - default: - b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) - } - b.WriteString(";") - - return b.String() -} diff --git a/emitter/redshift/redshift_manifest_emitter.go b/emitter/redshift/redshift_manifest_emitter.go index 9dc7602..5768c54 100644 --- a/emitter/redshift/redshift_manifest_emitter.go +++ b/emitter/redshift/redshift_manifest_emitter.go @@ -1,4 +1,4 @@ -package connector +package redshift import ( "bytes" diff --git a/emitter/redshift/redshift_manifest_emitter_test.go b/emitter/redshift/redshift_manifest_emitter_test.go index 802af6d..9dd2de9 100644 --- a/emitter/redshift/redshift_manifest_emitter_test.go +++ b/emitter/redshift/redshift_manifest_emitter_test.go @@ -1,4 +1,4 @@ -package connector +package redshiftemitter import "testing" diff --git a/s3_emitter.go b/emitter/s3/emitter.go similarity index 66% rename from s3_emitter.go rename to emitter/s3/emitter.go index 57cb419..1146548 100644 --- a/s3_emitter.go +++ b/emitter/s3/emitter.go @@ -1,29 +1,29 @@ -package connector +package s3 import ( "io" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" + awss3 "github.com/aws/aws-sdk-go/service/s3" "gopkg.in/matryer/try.v1" ) -// S3Emitter stores data in S3 bucket. +// Emitter stores data in S3 bucket. // // The use of this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this // struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated // from the first and last sequence numbers of the records contained in that file separated by a // dash. This struct requires the configuration of an S3 bucket and endpoint. -type S3Emitter struct { +type Emitter struct { Bucket string } // Emit is invoked when the buffer is full. This method emits the set of filtered records. -func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) { - svc := s3.New(session.New()) - params := &s3.PutObjectInput{ +func (e Emitter) Emit(s3Key string, b io.ReadSeeker) error { + svc := awss3.New(session.New()) + + params := &awss3.PutObjectInput{ Body: b, Bucket: aws.String(e.Bucket), ContentType: aws.String("text/plain"), @@ -37,10 +37,8 @@ func (e S3Emitter) Emit(s3Key string, b io.ReadSeeker) { }) if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - logger.Log("error", "s3.PutObject", "code", awsErr.Code()) - } + return err } - logger.Log("info", "S3Emitter", "msg", "success", "key", s3Key) + return nil } diff --git a/s3_key.go b/emitter/s3/key.go similarity index 76% rename from s3_key.go rename to emitter/s3/key.go index 64529f9..a168f81 100644 --- a/s3_key.go +++ b/emitter/s3/key.go @@ -1,11 +1,11 @@ -package connector +package s3 import ( "fmt" "time" ) -func S3Key(prefix, firstSeq, lastSeq string) string { +func Key(prefix, firstSeq, lastSeq string) string { date := time.Now().UTC().Format("2006/01/02") if prefix == "" { diff --git a/s3_key_test.go b/emitter/s3/key_test.go similarity index 67% rename from s3_key_test.go rename to emitter/s3/key_test.go index 976b8b0..d2ad154 100644 --- a/s3_key_test.go +++ b/emitter/s3/key_test.go @@ -1,4 +1,4 @@ -package connector +package s3 import ( "fmt" @@ -8,12 +8,12 @@ import ( "github.com/bmizerany/assert" ) -func Test_S3Key(t *testing.T) { +func Test_Key(t *testing.T) { d := time.Now().UTC().Format("2006/01/02") - k := S3Key("", "a", "b") + k := Key("", "a", "b") assert.Equal(t, k, fmt.Sprintf("%v/a-b", d)) - k = S3Key("prefix", "a", "b") + k = Key("prefix", "a", "b") assert.Equal(t, k, fmt.Sprintf("prefix/%v/a-b", d)) } diff --git a/s3_manifest_emitter.go b/emitter/s3/manifest_emitter.go similarity index 69% rename from s3_manifest_emitter.go rename to emitter/s3/manifest_emitter.go index cf9fbe5..46c0b06 100644 --- a/s3_manifest_emitter.go +++ b/emitter/s3/manifest_emitter.go @@ -1,8 +1,7 @@ -package connector +package s3 import ( "io" - "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -11,15 +10,15 @@ import ( // An implementation of Emitter that puts event data on S3 file, and then puts the // S3 file path onto the output stream for processing by manifest application. -type S3ManifestEmitter struct { +type ManifestEmitter struct { OutputStream string Bucket string Prefix string } -func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) { +func (e ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) error { // put contents to S3 Bucket - s3 := &S3Emitter{Bucket: e.Bucket} + s3 := &Emitter{Bucket: e.Bucket} s3.Emit(s3Key, b) // put file path on Kinesis output stream @@ -33,9 +32,8 @@ func (e S3ManifestEmitter) Emit(s3Key string, b io.ReadSeeker) { _, err := svc.PutRecord(params) if err != nil { - logger.Log("error", "PutRecord", "msg", err) - os.Exit(1) - } else { - logger.Log("info", "S3ManifestEmitter", "stream", e.OutputStream, "key", s3Key) + return err } + + return nil } diff --git a/examples/redshift/main.go b/examples/redshift/main.go deleted file mode 100644 index 5ca7514..0000000 --- a/examples/redshift/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/harlow/kinesis-connectors" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" -) - -type Config struct { - Pipeline struct { - Name string - } - Kinesis struct { - BufferSize int - ShardCount int - StreamName string - } - S3 struct { - BucketName string - } - Redshift struct { - Delimiter string - Format string - Jsonpaths string - } -} - -func NewPipeline(cfg Config) *connector.Pipeline { - f := &connector.AllPassFilter{} - t := &connector.StringToStringTransformer{} - c := &connector.RedisCheckpoint{ - AppName: cfg.AppName, - StreamName: cfg.KinesisStream, - } - e := &connector.RedshiftEmitter{ - TableName: cfg.TableName, - S3Bucket: cfg.S3Bucket, - Format: cfg.Format, - } - return &connector.Pipeline{ - Buffer: b, - Checkpoint: c, - Emitter: e, - Filter: f, - NumRecordsToBuffer: cfg.NumRecordsToBuffer, - StreamName: cfg.KinesisStream, - Transformer: t, - } -} - -func main() { - var cfg Config - var err error - - // Load config vars - err = gcfg.ReadFileInto(&cfg, "pipeline.cfg") - if err != nil { - fmt.Printf("Config ERROR: %v\n", err) - } - - // Initialize Kinesis client - auth := kinesis.NewAuth() - ksis := kinesis.New(&auth, kinesis.Region{}) - - // Fetch stream info - args := kinesis.NewArgs() - args.Add("StreamName", cfg.Kinesis.StreamName) - streamInfo, err := ksis.DescribeStream(args) - if err != nil { - fmt.Printf("Unable to connect to %s stream. Aborting.", cfg.Kinesis.StreamName) - return - } - - // Process kinesis shards - for _, shard := range streamInfo.StreamDescription.Shards { - fmt.Printf("Processing %s on %s\n", shard.ShardId, cfg.Kinesis.StreamName) - p := newS3Pipeline(cfg) - go p.ProcessShard(ksis, shard.ShardId) - } - - // Keep alive - <-make(chan int) -} diff --git a/examples/redshift/pipeline.cfg b/examples/redshift/pipeline.cfg deleted file mode 100644 index 3634a09..0000000 --- a/examples/redshift/pipeline.cfg +++ /dev/null @@ -1,12 +0,0 @@ -[pipeline] - name = s3Pipeline -[s3] - bucketName = kinesis.test -[kinesis] - bufferSize = 100 - shardCount = 2 - streamName = userStream -[redshift] - tableName = kinesis_pipeline_test - format = json - copyMandatory = true diff --git a/examples/s3/main.go b/examples/s3/main.go index c091241..499e844 100644 --- a/examples/s3/main.go +++ b/examples/s3/main.go @@ -3,8 +3,10 @@ package main import ( "bytes" "flag" + "fmt" "github.com/harlow/kinesis-connectors" + "github.com/harlow/kinesis-connectors/emitter/s3" ) var ( @@ -13,26 +15,27 @@ var ( stream = flag.String("s", "", "Stream name") ) -func handler(b connector.Buffer) { - body := new(bytes.Buffer) - - // filter or transform data if needed - for _, r := range b.GetRecords() { - body.Write(r.Data) - } - - s3 := &connector.S3Emitter{Bucket: *bucket} - s3.Emit( - connector.S3Key("", b.FirstSeq(), b.LastSeq()), - bytes.NewReader(body.Bytes()), - ) -} - func main() { flag.Parse() + emitter := &s3.Emitter{Bucket: *bucket} c := connector.NewConsumer(*app, *stream) - c.Start(connector.HandlerFunc(handler)) + c.Start(connector.HandlerFunc(func(b connector.Buffer) { + body := new(bytes.Buffer) + + for _, r := range b.GetRecords() { + body.Write(r.Data) + } + + err := emitter.Emit( + s3.Key("", b.FirstSeq(), b.LastSeq()), + bytes.NewReader(body.Bytes()), + ) + + if err != nil { + fmt.Printf("error %s", err) + } + })) select {} // run forever } diff --git a/examples/seed/README.md b/examples/seed/README.md index 1ac51b8..7612c72 100644 --- a/examples/seed/README.md +++ b/examples/seed/README.md @@ -1,4 +1,4 @@ -# Seed the Stream +# Populate the Stream with data A prepopulated file with JSON users is available on S3 for seeing the stream: @@ -17,4 +17,4 @@ export AWS_SECRET_KEY= ### Running the code $ curl https://s3.amazonaws.com/kinesis.test/users.txt > /tmp/users.txt - $ go run main.go + $ go run main.go -s streamName