From 06b40e6ed8f10be1c1244174df620de8f18f3180 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Thu, 24 Jul 2014 23:03:41 -0700 Subject: [PATCH] Base pipeline components * Create base Interfaces for Pipeline * Add first base implementations for Pipeline * Add initial test for core functionality --- .gitignore | 9 +- README.md | 212 +++++++++++++++++++++++++++++++++++++-- all_pass_filter.go | 9 ++ buffer.go | 16 +++ checkpoint.go | 10 ++ emitter.go | 11 ++ filter.go | 7 ++ model.go | 6 ++ pipeline.go | 86 ++++++++++++++++ record_buffer.go | 67 +++++++++++++ record_buffer_test.go | 118 ++++++++++++++++++++++ redis_checkpoint.go | 40 ++++++++ redis_checkpoint_test.go | 48 +++++++++ redshift_emitter.go | 62 ++++++++++++ redshift_emitter_test.go | 20 ++++ s3_emitter.go | 48 +++++++++ s3_emitter_test.go | 18 ++++ transformer.go | 7 ++ utils.go | 144 ++++++++++++++++++++++++++ 19 files changed, 929 insertions(+), 9 deletions(-) create mode 100644 all_pass_filter.go create mode 100644 buffer.go create mode 100644 checkpoint.go create mode 100644 emitter.go create mode 100644 filter.go create mode 100644 model.go create mode 100644 pipeline.go create mode 100644 record_buffer.go create mode 100644 record_buffer_test.go create mode 100644 redis_checkpoint.go create mode 100644 redis_checkpoint_test.go create mode 100644 redshift_emitter.go create mode 100644 redshift_emitter_test.go create mode 100644 s3_emitter.go create mode 100644 s3_emitter_test.go create mode 100644 transformer.go create mode 100644 utils.go diff --git a/.gitignore b/.gitignore index 8365624..7f5fead 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,12 @@ _cgo_export.* _testmain.go -*.exe *.test +*.exe + +# vim temp files +.*.swp +.*.swo + +# System files +.DS_Store diff --git a/README.md b/README.md index 1a56969..d216565 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,210 @@ -# Kinesis Connector Application +# Golang Kinesis Connectors -A Kinesis Connector Application written in Go for extracting streaming event data -into S3, Redshift, DynamoDB, and more. +__Note:__ _This codebase is a under active development._ -__Note:__ _This codebase is a under active development, and is not condisdered -production ready._ +### Kinesis connector applications written in Go -## Installation +This is a port of the [AWS Kinesis connector libraries][2] from Java to Go for extracting streaming event data +into S3, Redshift, DynamoDB, and more. See the [API Docs][1] for package documentation. -Clone the repository. +## Overview - $ git clone git@github.com:harlow/go-etl.git +Each Amazon Kinesis connector application is a pipeline that determines how records from an Amazon Kinesis stream will be handled. Records are retrieved from the stream, transformed according to a user-defined data model, buffered for batch processing, and then emitted to the appropriate AWS service. +![golang_kinesis_connector](https://cloud.githubusercontent.com/assets/739782/4262283/2ee2550e-3b97-11e4-8cd1-21a5d7ee0964.png) + +A connector pipeline uses the following interfaces: + +* __Pipeline:__ The pipeline implementation itself. +* __Transformer:__ Defines the transformation of records from the Amazon Kinesis stream in order to suit the user-defined data model. Includes methods for custom serializer/deserializers. +* __Filter:__ Defines a method for excluding irrelevant records from the processing. +* __Buffer:__ Defines a system for batching the set of records to be processed. The application can specify three thresholds: number of records, total byte count, and time. When one of these thresholds is crossed, the buffer is flushed and the data is emitted to the destination. +* __Emitter:__ Defines a method that makes client calls to other AWS services and persists the records stored in the buffer. The records can also be sent to another Amazon Kinesis stream. + +## Usage + +Install the library: + + $ go get github.com/harlow/kinesis-connectors + +The library has been broken into several components (buffers, checkpoints, filters, transformers, and emitters). These compontents can be mixed and matched to generate the desired functionality. + +### Example Redshift Pipeline + +The Redshift Pipeline will pull records from Kinesis and buffer them untill the desired threshold is reached. The Emitter will then upload the buffered records to an S3 bucket, set a checkpoint in Redis, and copy data to to Redshift. + +```go +package main + +import ( + "fmt" + + "github.com/harlow/kinesis-connectors" + "github.com/harlow/sample-connectors/transformers" + "github.com/joho/godotenv" + "github.com/sendgridlabs/go-kinesis" +) + +type Config struct { + AppName string + NumRecordsToBuffer int + KinesisStream string + KinesisStreamShardCount int + TableName string + S3Bucket string + Format string + Delimiter string +} + +func NewPipeline(cfg Config) *connector.Pipeline { + b := connector.RecordBuffer{ + NumRecordsToBuffer: cfg.NumRecordsToBuffer, + } + + c := connector.RedisCheckpoint{ + AppName: cfg.AppName, + StreamName: cfg.KinesisStream, + } + + e := connector.RedshiftEmitter{ + TableName: cfg.TableName, + S3Bucket: cfg.S3Bucket, + Format: cfg.Format, + Delimiter: cfg.Delimiter, + } + + f := connector.AllPassFilter{} + + t := transformers.UserTransformer{} + + return &connector.Pipeline{ + Buffer: &b, + Checkpoint: &c, + Emitter: &e, + Filter: &f, + StreamName: cfg.KinesisStream, + Transformer: &t, + } +} + +func main() { + var cfg Config + godotenv.Load() + ksis := kinesis.New("", "", kinesis.Region{}) + + connector.LoadConfig(&cfg, "redshift_basic_pipeline.properties") + connector.CreateAndWaitForStreamToBecomeAvailable(ksis, cfg.KinesisStream, cfg.KinesisStreamShardCount) + + args := kinesis.NewArgs() + args.Add("StreamName", cfg.KinesisStream) + streamInfo, err := ksis.DescribeStream(args) + + if err != nil { + fmt.Printf("Unable to connect to %v stream. Aborting.", cfg.KinesisStream) + return + } + + for _, shard := range streamInfo.StreamDescription.Shards { + var p = NewPipeline(cfg) + go p.ProcessShard(ksis, shard.ShardId) + } + + select {} +} +``` + +```go +package models + +// Implements Model interface +type User struct { + ID int `json:"userid"` + Username string `json:"username"` + Firstname string `json:"firstname"` + Lastname string `json:"lastname"` + City string `json:"city"` + State string `json:"state"` + Email string `json:"email"` + Phone string `json:"phone"` + Likesports bool `json:"likesports"` + Liketheatre bool `json:"liketheatre"` + Likeconcerts bool `json:"likeconcerts"` + Likejazz bool `json:"likejazz"` + Likeclassical bool `json:"likeclassical"` + Likeopera bool `json:"likeopera"` + Likerock bool `json:"likerock"` + Likevegas bool `json:"likevegas"` + Likebroadway bool `json:"likebroadway"` + Likemusicals bool `json:"likemusicals"` +} + +func (u User) ToString() string { + s := []string{ + strconv.Itoa(u.ID), + u.Username, + u.Firstname, + u.Lastname, + u.City, + u.State, + u.Email, + u.Phone, + strconv.FormatBool(u.Likesports), + strconv.FormatBool(u.Liketheatre), + strconv.FormatBool(u.Likeconcerts), + strconv.FormatBool(u.Likejazz), + strconv.FormatBool(u.Likeclassical), + strconv.FormatBool(u.Likeopera), + strconv.FormatBool(u.Likerock), + strconv.FormatBool(u.Likevegas), + strconv.FormatBool(u.Likebroadway), + strconv.FormatBool(u.Likemusicals), + "\n", + } + + return strings.Join(s, "|") +} +``` + +```go +package transformers + +// Implements Transformer interface +type UserTransformer struct {} + +func (t *UserTransformer) ToModel(data []byte) connector.Model { + user := &models.User{} + json.Unmarshal(data, &user) + return user +} +``` + +```sql +CREATE TABLE users ( + id INTEGER, + username VARCHAR(255), + first_name VARCHAR(255), + last_name VARCHAR(255), + city VARCHAR(255), + state VARCHAR(255), + email VARCHAR(255), + phone VARCHAR(255), + like_sports BOOLEAN, + like_theatre BOOLEAN, + like_concerts BOOLEAN, + like_jazz BOOLEAN, + like_classical BOOLEAN, + like_opera BOOLEAN, + like_rock BOOLEAN, + like_vegas BOOLEAN, + like_broadway BOOLEAN, + like_musicals BOOLEAN, + PRIMARY KEY(id) +) +DISTSTYLE KEY +DISTKEY(id) +SORTKEY(id) +``` + +[1]: http://godoc.org/github.com/harlow/kinesis-connectors +[2]: http://aws.amazon.com/kinesis/ +[3]: https://github.com/awslabs/amazon-kinesis-connectors diff --git a/all_pass_filter.go b/all_pass_filter.go new file mode 100644 index 0000000..9fc68b6 --- /dev/null +++ b/all_pass_filter.go @@ -0,0 +1,9 @@ +package connector + +// A basic implementation of the Filter interface that returns true for all records. +type AllPassFilter struct{} + +// Returns true for all records. +func (b *AllPassFilter) KeepRecord(m Model) bool { + return true +} diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..cfa6635 --- /dev/null +++ b/buffer.go @@ -0,0 +1,16 @@ +package connector + +// Buffer defines a buffer used to store records streamed through Kinesis. It is a part of the +// Pipeline utilized by the Pipeline.ProcessShard function. Records are stored in the buffer by calling +// the Add method. The buffer has two size limits defined: total total number of records and a +// time limit in seconds. The ShouldFlush() method may indicate that the buffer is full based on +// these limits. +type Buffer interface { + Add(data Model, sequenceNumber string) + FirstSequenceNumber() string + Flush() + LastSequenceNumber() string + NumRecordsInBuffer() int + Records() []Model + ShouldFlush() bool +} diff --git a/checkpoint.go b/checkpoint.go new file mode 100644 index 0000000..05ba85f --- /dev/null +++ b/checkpoint.go @@ -0,0 +1,10 @@ +package connector + +// Used by Pipeline.ProcessShard when they want to checkpoint their progress. +// The Kinesis Connector Library will pass an object implementing this interface to ProcessShard, +// so they can checkpoint their progress. +type Checkpoint interface { + CheckpointExists(shardID string) bool + SequenceNumber() string + SetCheckpoint(shardID string, sequenceNumber string) +} diff --git a/emitter.go b/emitter.go new file mode 100644 index 0000000..7fb120e --- /dev/null +++ b/emitter.go @@ -0,0 +1,11 @@ +package connector + +// Emitter takes a full buffer and processes the stored records. The Emitter is a member of the +// Pipeline that "emits" the objects that have been deserialized by the +// Transformer. The Emit() method is invoked when the buffer is full (possibly to persist the +// records or send them to another Kinesis stream). After emitting the records. +// Implementations may choose to fail the entire set of records in the buffer or to fail records +// individually. +type Emitter interface { + Emit(buffer Buffer) +} diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..b7f8d05 --- /dev/null +++ b/filter.go @@ -0,0 +1,7 @@ +package connector + +// The Filter is associated with an Buffer. The Buffer may use the result of calling the +// KeepRecord() method to decide whether to store a record or discard it. +type Filter interface { + KeepRecord(m Model) bool +} diff --git a/model.go b/model.go new file mode 100644 index 0000000..723fdad --- /dev/null +++ b/model.go @@ -0,0 +1,6 @@ +package connector + +// Used to map the attributres of the data being sent through the Kinesis stream +type Model interface { + ToString() string +} diff --git a/pipeline.go b/pipeline.go new file mode 100644 index 0000000..9fc1af4 --- /dev/null +++ b/pipeline.go @@ -0,0 +1,86 @@ +package connector + +import ( + "fmt" + "time" + + "github.com/sendgridlabs/go-kinesis" +) + +// This struct is used by the main application to configure instances of the user's implemented pipline. +// The user should implement this such that each method returns a configured implementation of each +// interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. +// Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. +type Pipeline struct { + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter + StreamName string + Transformer Transformer +} + +func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { + args := kinesis.NewArgs() + args.Add("ShardId", shardID) + args.Add("StreamName", p.StreamName) + + if p.Checkpoint.CheckpointExists(shardID) { + args.Add("ShardIteratorType", "AFTER_SEQUENCE_NUMBER") + args.Add("StartingSequenceNumber", p.Checkpoint.SequenceNumber()) + } else { + args.Add("ShardIteratorType", "TRIM_HORIZON") + } + + shardInfo, err := ksis.GetShardIterator(args) + + if err != nil { + fmt.Printf("Error fetching shard itterator: %v", err) + return + } + + shardIterator := shardInfo.ShardIterator + + for { + args = kinesis.NewArgs() + args.Add("ShardIterator", shardIterator) + recordSet, err := ksis.GetRecords(args) + + if err != nil { + fmt.Printf("GetRecords ERROR: %v\n", err) + continue + } + + if len(recordSet.Records) > 0 { + for _, v := range recordSet.Records { + data, err := v.GetData() + + if err != nil { + fmt.Printf("GetData ERROR: %v\n", err) + continue + } + + var m = p.Transformer.ToModel(data) + + if p.Filter.KeepRecord(m) { + p.Buffer.Add(m, v.SequenceNumber) + } + } + } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { + fmt.Printf("NextShardIterator ERROR: %v\n", err) + break + } else { + fmt.Printf("Sleeping: %v\n", shardID) + time.Sleep(5 * time.Second) + } + + if p.Buffer.ShouldFlush() { + fmt.Printf("Emitting to Shard: %v\n", shardID) + p.Emitter.Emit(p.Buffer) + p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) + p.Buffer.Flush() + } + + shardIterator = recordSet.NextShardIterator + } +} diff --git a/record_buffer.go b/record_buffer.go new file mode 100644 index 0000000..8bfb287 --- /dev/null +++ b/record_buffer.go @@ -0,0 +1,67 @@ +package connector + +// This struct is a basic implementation of the Buffer interface. It is a wrapper on a buffer of +// records that are periodically flushed. It is configured with an implementation of Filter that +// decides whether a record will be added to the buffer to be emitted. +type RecordBuffer struct { + NumRecordsToBuffer int + firstSequenceNumber string + lastSequenceNumber string + recordsInBuffer []Model + sequencesInBuffer []string +} + +// Adds a message to the buffer. +func (b *RecordBuffer) Add(record Model, sequenceNumber string) { + if len(b.sequencesInBuffer) == 0 { + b.firstSequenceNumber = sequenceNumber + } + + b.lastSequenceNumber = sequenceNumber + + if !b.sequenceExists(sequenceNumber) { + b.recordsInBuffer = append(b.recordsInBuffer, record) + b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) + } +} + +// Returns the records in the buffer. +func (b *RecordBuffer) Records() []Model { + return b.recordsInBuffer +} + +// Returns the number of messages in the buffer. +func (b RecordBuffer) NumRecordsInBuffer() int { + return len(b.sequencesInBuffer) +} + +// Flushes the content in the buffer and resets the sequence counter. +func (b *RecordBuffer) Flush() { + b.recordsInBuffer = b.recordsInBuffer[:0] + b.sequencesInBuffer = b.sequencesInBuffer[:0] +} + +// Checks if the sequence already exists in the buffer. +func (b *RecordBuffer) sequenceExists(sequenceNumber string) bool { + for _, v := range b.sequencesInBuffer { + if v == sequenceNumber { + return true + } + } + return false +} + +// Determines if the buffer has reached its target size. +func (b *RecordBuffer) ShouldFlush() bool { + return len(b.sequencesInBuffer) >= b.NumRecordsToBuffer +} + +// Returns the sequence number of the first message in the buffer. +func (b *RecordBuffer) FirstSequenceNumber() string { + return b.firstSequenceNumber +} + +// Returns the sequence number of the last message in the buffer. +func (b *RecordBuffer) LastSequenceNumber() string { + return b.lastSequenceNumber +} diff --git a/record_buffer_test.go b/record_buffer_test.go new file mode 100644 index 0000000..7346abb --- /dev/null +++ b/record_buffer_test.go @@ -0,0 +1,118 @@ +package connector + +import "testing" + +type TestModel struct{} + +func (u TestModel) ToString() string { + return "ok" +} + +func TestAdd(t *testing.T) { + var r1, s1 = TestModel{}, "Seq1" + var r2, s2 = TestModel{}, "Seq2" + + b := RecordBuffer{} + b.Add(r1, s1) + + if b.NumRecordsInBuffer() != 1 { + t.Errorf("NumRecordsInBuffer() want %v", 1) + } + + b.Add(r2, s2) + + if b.NumRecordsInBuffer() != 2 { + t.Errorf("NumRecordsInBuffer() want %v", 2) + } + + b.Add(r2, s2) + + if b.NumRecordsInBuffer() != 2 { + t.Errorf("NumRecordsInBuffer() want %v", 2) + } +} + +func TestSequenceExists(t *testing.T) { + var r1, s1 = TestModel{}, "Seq1" + var r2, s2 = TestModel{}, "Seq2" + + b := RecordBuffer{} + b.Add(r1, s1) + + if b.sequenceExists(s1) != true { + t.Errorf("sequenceExists() want %v", true) + } + + b.Add(r2, s2) + + if b.sequenceExists(s2) != true { + t.Errorf("sequenceExists() want %v", true) + } +} + +func TestFlush(t *testing.T) { + var r1, s1 = TestModel{}, "SeqNum" + b := RecordBuffer{} + b.Add(r1, s1) + + b.Flush() + + if b.NumRecordsInBuffer() != 0 { + t.Errorf("Count() want %v", 0) + } +} + +func TestLastSequenceNumber(t *testing.T) { + var r1, s1 = TestModel{}, "Seq1" + var r2, s2 = TestModel{}, "Seq2" + + b := RecordBuffer{} + b.Add(r1, s1) + + if b.LastSequenceNumber() != s1 { + t.Errorf("LastSequenceNumber() want %v", s1) + } + + b.Add(r2, s2) + + if b.LastSequenceNumber() != s2 { + t.Errorf("LastSequenceNumber() want %v", s2) + } +} + +func TestFirstSequenceNumber(t *testing.T) { + var r1, s1 = TestModel{}, "Seq1" + var r2, s2 = TestModel{}, "Seq2" + + b := RecordBuffer{} + b.Add(r1, s1) + + if b.FirstSequenceNumber() != s1 { + t.Errorf("FirstSequenceNumber() want %v", s1) + } + + b.Add(r2, s2) + + if b.FirstSequenceNumber() != s1 { + t.Errorf("FirstSequenceNumber() want %v", s1) + } +} + +func TestShouldFlush(t *testing.T) { + const n = 2 + var r1, s1 = TestModel{}, "Seq1" + var r2, s2 = TestModel{}, "Seq2" + + b := RecordBuffer{NumRecordsToBuffer: n} + b.Add(r1, s1) + + if b.ShouldFlush() != false { + t.Errorf("ShouldFlush() want %v", false) + } + + b.Add(r2, s2) + + if b.ShouldFlush() != true { + t.Errorf("ShouldFlush() want %v", true) + } +} diff --git a/redis_checkpoint.go b/redis_checkpoint.go new file mode 100644 index 0000000..51517b2 --- /dev/null +++ b/redis_checkpoint.go @@ -0,0 +1,40 @@ +package connector + +import ( + "fmt" + + "github.com/hoisie/redis" +) + +// A Redis implementation of the Checkpont interface. This class is used to enable the Pipeline.ProcessShard +// to checkpoint their progress. +type RedisCheckpoint struct { + AppName string + client redis.Client + sequenceNumber string + StreamName string +} + +func (c *RedisCheckpoint) CheckpointExists(shardID string) bool { + val, _ := c.client.Get(c.key(shardID)) + + if val != nil && string(val) != "" { + c.sequenceNumber = string(val) + return true + } else { + return false + } +} + +func (c *RedisCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + +func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string) { + c.client.Set(c.key(shardID), []byte(sequenceNumber)) + c.sequenceNumber = sequenceNumber +} + +func (c *RedisCheckpoint) key(shardID string) string { + return fmt.Sprintf("%v:checkpoint:%v:%v", c.AppName, c.StreamName, shardID) +} diff --git a/redis_checkpoint_test.go b/redis_checkpoint_test.go new file mode 100644 index 0000000..860a36d --- /dev/null +++ b/redis_checkpoint_test.go @@ -0,0 +1,48 @@ +package connector + +import ( + "testing" + + "github.com/hoisie/redis" +) + +func TestKey(t *testing.T) { + k := "app:checkpoint:stream:shard" + c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + + r := c.key("shard") + + if r != k { + t.Errorf("key() = %v, want %v", k, r) + } +} + +func TestCheckpointExists(t *testing.T) { + var rc redis.Client + k := "app:checkpoint:stream:shard" + rc.Set(k, []byte("fakeSeqNum")) + c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + + r := c.CheckpointExists("shard") + + if r != true { + t.Errorf("CheckpointExists() = %v, want %v", false, r) + } + + rc.Del(k) +} + +func TestSetCheckpoint(t *testing.T) { + k := "app:checkpoint:stream:shard" + var rc redis.Client + c := RedisCheckpoint{AppName: "app", StreamName: "stream"} + c.SetCheckpoint("shard", "fakeSeqNum") + + r, _ := rc.Get(k) + + if string(r) != "fakeSeqNum" { + t.Errorf("SetCheckpoint() = %v, want %v", "fakeSeqNum", r) + } + + rc.Del(k) +} diff --git a/redshift_emitter.go b/redshift_emitter.go new file mode 100644 index 0000000..beebfad --- /dev/null +++ b/redshift_emitter.go @@ -0,0 +1,62 @@ +package connector + +import ( + "bytes" + "database/sql" + "fmt" + "log" + "os" + + _ "github.com/lib/pq" +) + +// This struct is an implementation of Emitter that buffered batches of records into Redshift one by one. +// It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered +// data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. +type RedshiftEmitter struct { + Delimiter string + Format string + Jsonpath string + S3Bucket string + TableName string +} + +// Invoked when the buffer is full. This method leverages the S3Emitter and then issues a copy command to +// Redshift data store. +func (e RedshiftEmitter) Emit(b Buffer) { + s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} + s3Emitter.Emit(b) + s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) + db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) + + if err != nil { + log.Fatal(err) + } + + _, err = db.Exec(e.copyStatement(s3File)) + + if err != nil { + log.Fatal(err) + } + + fmt.Printf("Redshift load completed.\n") + db.Close() +} + +// Creates the SQL copy statement issued to Redshift cluster. +func (e RedshiftEmitter) copyStatement(s3File string) string { + var b bytes.Buffer + b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) + b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) + b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) + b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) + + if e.Format == "json" { + b.WriteString(fmt.Sprintf("json 'auto'")) + } else { + b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) + } + + b.WriteString(";") + return b.String() +} diff --git a/redshift_emitter_test.go b/redshift_emitter_test.go new file mode 100644 index 0000000..030515d --- /dev/null +++ b/redshift_emitter_test.go @@ -0,0 +1,20 @@ +package connector + +import ( + "testing" +) + +func TestCopyStatement(t *testing.T) { + e := RedshiftEmitter{ + Delimiter: ",", + S3Bucket: "test_bucket", + TableName: "test_table", + } + f := e.copyStatement("/test.txt") + + copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';" + + if f != copyStatement { + t.Errorf("copyStatement() = %s want %s", f, copyStatement) + } +} diff --git a/s3_emitter.go b/s3_emitter.go new file mode 100644 index 0000000..5b863ec --- /dev/null +++ b/s3_emitter.go @@ -0,0 +1,48 @@ +package connector + +import ( + "bytes" + "fmt" + "time" + + "github.com/crowdmob/goamz/aws" + "github.com/crowdmob/goamz/s3" +) + +// This implementation of Emitter is used to store files from a Kinesis stream in S3. The use of +// this struct requires the configuration of an S3 bucket/endpoint. When the buffer is full, this +// struct's Emit method adds the contents of the buffer to S3 as one file. The filename is generated +// from the first and last sequence numbers of the records contained in that file separated by a +// dash. This struct requires the configuration of an S3 bucket and endpoint. +type S3Emitter struct { + S3Bucket string +} + +// Generates a file name based on the First and Last sequence numbers from the buffer. The current +// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. +func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { + date := time.Now().UTC().Format("2006-01-02") + return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) +} + +// Invoked when the buffer is full. This method emits the set of filtered records. +func (e S3Emitter) Emit(buf Buffer) { + auth, _ := aws.EnvAuth() + s3Con := s3.New(auth, aws.USEast) + bucket := s3Con.Bucket(e.S3Bucket) + s3File := e.S3FileName(buf.FirstSequenceNumber(), buf.LastSequenceNumber()) + + var buffer bytes.Buffer + + for _, r := range buf.Records() { + buffer.WriteString(r.ToString()) + } + + err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) + + if err != nil { + fmt.Printf("Error occured while uploding to S3: %v\n", err) + } else { + fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", buf.NumRecordsInBuffer(), e.S3Bucket, s3File) + } +} diff --git a/s3_emitter_test.go b/s3_emitter_test.go new file mode 100644 index 0000000..981412f --- /dev/null +++ b/s3_emitter_test.go @@ -0,0 +1,18 @@ +package connector + +import ( + "fmt" + "testing" + "time" +) + +func TestS3FileName(t *testing.T) { + d := time.Now().UTC().Format("2006-01-02") + n := fmt.Sprintf("/%v/a-b.txt", d) + e := S3Emitter{} + f := e.S3FileName("a", "b") + + if f != n { + t.Errorf("S3FileName() = want %v", f, n) + } +} diff --git a/transformer.go b/transformer.go new file mode 100644 index 0000000..ad895bb --- /dev/null +++ b/transformer.go @@ -0,0 +1,7 @@ +package connector + +// Transformer is used to transform data from a Record (byte array) to the data model for +// processing in the application. +type Transformer interface { + ToModel(data []byte) Model +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..cb6be67 --- /dev/null +++ b/utils.go @@ -0,0 +1,144 @@ +package connector + +import ( + "bufio" + "fmt" + "log" + "os" + "reflect" + "regexp" + "strconv" + "strings" + "time" + "unicode" + + "github.com/sendgridlabs/go-kinesis" +) + +func readLines(path string) ([]string, error) { + file, err := os.Open(path) + + if err != nil { + return nil, err + } + + defer file.Close() + var lines []string + + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } + + return lines, scanner.Err() +} + +var ( + assignRegex = regexp.MustCompile(`^([^=]+)=(.*)$`) +) + +func upcaseInitial(str string) string { + for i, v := range str { + return string(unicode.ToUpper(v)) + str[i+1:] + } + + return "" +} + +// Opens the file path and loads config values into the sturct. +func LoadConfig(config interface{}, filename string) error { + lines, err := readLines(filename) + + if err != nil { + log.Fatalf("Config Load ERROR: %s\n", err) + } + + mutable := reflect.ValueOf(config).Elem() + + for _, line := range lines { + line = strings.TrimSpace(line) + + if len(line) == 0 || line[0] == ';' || line[0] == '#' { + continue + } + + if groups := assignRegex.FindStringSubmatch(line); groups != nil { + key, val := groups[1], groups[2] + key, val = strings.TrimSpace(key), strings.TrimSpace(val) + key = upcaseInitial(key) + field := mutable.FieldByName(key) + + if !field.IsValid() { + log.Fatalf("Config ERROR: Field %s not found\n", key) + } + + switch field.Type().Name() { + case "int": + val, _ := strconv.ParseInt(val, 0, 64) + mutable.FieldByName(key).SetInt(val) + case "bool": + val, _ := strconv.ParseBool(val) + mutable.FieldByName(key).SetBool(val) + default: + mutable.FieldByName(key).SetString(val) + } + } + } + + return nil +} + +// Creates a new Kinesis stream (uses existing stream if exists) and waits for it to become available. +func CreateAndWaitForStreamToBecomeAvailable(k *kinesis.Kinesis, streamName string, shardCount int) { + if !StreamExists(k, streamName) { + err := k.CreateStream(streamName, shardCount) + + if err != nil { + fmt.Printf("CreateStream ERROR: %v\n", err) + return + } + } + + resp := &kinesis.DescribeStreamResp{} + timeout := make(chan bool, 30) + + for { + args := kinesis.NewArgs() + args.Add("StreamName", streamName) + resp, _ = k.DescribeStream(args) + streamStatus := resp.StreamDescription.StreamStatus + fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + + if streamStatus != "ACTIVE" { + time.Sleep(4 * time.Second) + timeout <- true + } else { + break + } + } +} + +// Check if a Kinesis stream exists. +func StreamExists(k *kinesis.Kinesis, streamName string) bool { + args := kinesis.NewArgs() + resp, _ := k.ListStreams(args) + for _, s := range resp.StreamNames { + if s == streamName { + return true + } + } + return false +} + +// Delete a Kinesis stream. +func DeleteStream(k *kinesis.Kinesis, streamName string) { + err := k.DeleteStream("test") + + if err != nil { + fmt.Printf("DeleteStream ERROR: %v\n", err) + return + } + + fmt.Printf("Stream [%v] is DELETING\n", streamName) +}