From 4e909185d185a0d7bebb13bcec964220a11cc786 Mon Sep 17 00:00:00 2001 From: Harlow Ward Date: Tue, 29 Jul 2014 23:09:57 -0700 Subject: [PATCH] Add Record Processor * Add GetRecords function to Kinesis Utils * Add sample .env to showcase ENV vars needed * Fix RedisCheckpoint to look for empty string (default from Redis) * Extract example code into its own repository --- .gitignore | 7 ++- buffers/buffer.go | 8 ---- buffers/msg_buffer.go | 18 ++++---- buffers/msg_buffer_test.go | 16 ++----- checkpoints/checkpoint.go | 8 ---- checkpoints/redis_checkpoint.go | 10 ++-- emitters/emitter.go | 5 -- emitters/redshift_emitter.go | 44 ++++++++++++++++-- emitters/s3_emitter.go | 23 ++++++---- emitters/s3_manifest_emitter.go | 30 ++++++++++++ examples/consumer.go | 36 --------------- examples/producer.go | 35 -------------- interfaces/buffer.go | 11 +++++ interfaces/checkpoint.go | 8 ++++ interfaces/emitter.go | 5 ++ interfaces/transformer.go | 5 ++ pipeline/pipeline.go | 73 +++++++++++++++++++++++++++++ utils/config_utils.go | 81 +++++++++++++++++++++++++++++++++ 18 files changed, 288 insertions(+), 135 deletions(-) delete mode 100644 buffers/buffer.go delete mode 100644 checkpoints/checkpoint.go delete mode 100644 emitters/emitter.go create mode 100644 emitters/s3_manifest_emitter.go delete mode 100644 examples/consumer.go delete mode 100644 examples/producer.go create mode 100644 interfaces/buffer.go create mode 100644 interfaces/checkpoint.go create mode 100644 interfaces/emitter.go create mode 100644 interfaces/transformer.go create mode 100644 pipeline/pipeline.go create mode 100644 utils/config_utils.go diff --git a/.gitignore b/.gitignore index 92e6f5f..b7c6320 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,9 @@ _cgo_export.* _testmain.go -*.env -*.exe *.test +*.exe + +# vim temp files +.*.swp +.*.swo diff --git a/buffers/buffer.go b/buffers/buffer.go deleted file mode 100644 index e8852e0..0000000 --- a/buffers/buffer.go +++ /dev/null @@ -1,8 +0,0 @@ -package buffers - -type Buffer interface { - Data() []byte - FirstSequenceNumber() string - LastSequenceNumber() string - NumMessagesInBuffer() int -} diff --git a/buffers/msg_buffer.go b/buffers/msg_buffer.go index cf57c94..8a07970 100644 --- a/buffers/msg_buffer.go +++ b/buffers/msg_buffer.go @@ -6,12 +6,12 @@ type MsgBuffer struct { buffer bytes.Buffer firstSequenceNumber string lastSequenceNumber string - numMessagesToBuffer int + NumMessagesToBuffer int sequencesInBuffer []string } -func (b MsgBuffer) NumMessagesToBuffer() int { - return b.numMessagesToBuffer +func NewMessageBuffer(n int) *MsgBuffer { + return &MsgBuffer{NumMessagesToBuffer: n} } func (b *MsgBuffer) ConsumeRecord(data []byte, sequenceNumber string) { @@ -44,19 +44,19 @@ func (b MsgBuffer) NumMessagesInBuffer() int { return len(b.sequencesInBuffer) } -func (b *MsgBuffer) FlushBuffer() { +func (b *MsgBuffer) Flush() { b.buffer.Reset() b.sequencesInBuffer = b.sequencesInBuffer[:0] } func (b MsgBuffer) ShouldFlush() bool { - return len(b.sequencesInBuffer) >= b.NumMessagesToBuffer() -} - -func (b MsgBuffer) LastSequenceNumber() string { - return b.lastSequenceNumber + return len(b.sequencesInBuffer) >= b.NumMessagesToBuffer } func (b MsgBuffer) FirstSequenceNumber() string { return b.firstSequenceNumber } + +func (b MsgBuffer) LastSequenceNumber() string { + return b.lastSequenceNumber +} diff --git a/buffers/msg_buffer_test.go b/buffers/msg_buffer_test.go index 316cd28..8e91ffd 100644 --- a/buffers/msg_buffer_test.go +++ b/buffers/msg_buffer_test.go @@ -5,16 +5,6 @@ import ( "testing" ) -func TestNumMessagesToBuffer(t *testing.T) { - const n = 25 - b := MsgBuffer{numMessagesToBuffer: n} - r := b.NumMessagesToBuffer() - - if r != n { - t.Errorf("NumMessagesToBuffer() = %v, want %v", r, n) - } -} - func TestConsumeRecord(t *testing.T) { var r1, s1 = []byte("Record1"), "Seq1" var r2, s2 = []byte("Recrod2"), "Seq2" @@ -57,12 +47,12 @@ func TestSequenceExists(t *testing.T) { } } -func TestFlushBuffer(t *testing.T) { +func TestFlush(t *testing.T) { var r1, s1 = []byte("Record"), "SeqNum" b := MsgBuffer{} b.ConsumeRecord(r1, s1) - b.FlushBuffer() + b.Flush() if b.NumMessagesInBuffer() != 0 { t.Errorf("NumMessagesInBuffer() want %v", 0) @@ -110,7 +100,7 @@ func TestShouldFlush(t *testing.T) { var r1, s1 = []byte("Record1"), "Seq1" var r2, s2 = []byte("Recrod2"), "Seq2" - b := MsgBuffer{numMessagesToBuffer: n} + b := MsgBuffer{NumMessagesToBuffer: n} b.ConsumeRecord(r1, s1) if b.ShouldFlush() != false { diff --git a/checkpoints/checkpoint.go b/checkpoints/checkpoint.go deleted file mode 100644 index 1945358..0000000 --- a/checkpoints/checkpoint.go +++ /dev/null @@ -1,8 +0,0 @@ -package checkpoints - -type Checkpoint interface { - CheckpointExists(streamName string, shardID string) bool - SequenceNumber() string - SetCheckpoint(streamName string, shardID string, sequenceNumber string) -} - diff --git a/checkpoints/redis_checkpoint.go b/checkpoints/redis_checkpoint.go index 1043c4e..b65d8aa 100644 --- a/checkpoints/redis_checkpoint.go +++ b/checkpoints/redis_checkpoint.go @@ -12,15 +12,11 @@ type RedisCheckpoint struct { sequenceNumber string } -func (c RedisCheckpoint) SequenceNumber() string { - return c.sequenceNumber -} - func (c *RedisCheckpoint) CheckpointExists(streamName string, shardID string) bool { key := c.keyGen(streamName, shardID) val, _ := c.client.Get(key) - if val != nil { + if val != nil && string(val) != "" { c.sequenceNumber = string(val) return true } else { @@ -28,6 +24,10 @@ func (c *RedisCheckpoint) CheckpointExists(streamName string, shardID string) bo } } +func (c RedisCheckpoint) SequenceNumber() string { + return c.sequenceNumber +} + func (c *RedisCheckpoint) SetCheckpoint(streamName string, shardID string, sequenceNumber string) { key := c.keyGen(streamName, shardID) c.client.Set(key, []byte(sequenceNumber)) diff --git a/emitters/emitter.go b/emitters/emitter.go deleted file mode 100644 index c0bf91a..0000000 --- a/emitters/emitter.go +++ /dev/null @@ -1,5 +0,0 @@ -package emitters - -type Emitter interface { - Emit(path string, data []byte) -} diff --git a/emitters/redshift_emitter.go b/emitters/redshift_emitter.go index 2ae4641..1832759 100644 --- a/emitters/redshift_emitter.go +++ b/emitters/redshift_emitter.go @@ -1,16 +1,50 @@ package emitters import ( + "bytes" "fmt" + "os" + // "database/sql" + + "github.com/harlow/go-etl/interfaces" + // "github.com/lib/pq" ) type RedshiftEmitter struct { + redshiftDelimiter string + redshiftPassword string + redshiftTable string + redshiftURL string + redshiftUsername string + S3Bucket string } -func (e RedshiftEmitter) Emit(path string, data []byte) { - // first call S3 bucket - // pg.query("COPY file_path TO table_name") +func (e RedshiftEmitter) Emit(buffer interfaces.Buffer) { + s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} + s3Emitter.Emit(buffer) + // s3File := s3Emitter.S3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber()) + + // fmt.Printf("Redshift emitted: %v\n", s3File) + // db, err := sql.Open("postgres", "user=pqgotest dbname=pqgotest sslmode=verify-full") + + // if err != nil { + // log.Fatal(err) + // } + // pg.query("INSERT INTO imported_files VALUE file_path") - fmt.Printf("debug: emitting %v to Redshift\n", path) - fmt.Println(string(data)) + // err := db.Exec(generateCopyStatement(s3File)) + // rows, err := db.Query("SELECT pg_last_copy_count();") + // log.info("Successfully copied " + getNumberOfCopiedRecords(conn) + " records to Redshift from file s3://" + s3Bucket + "/" + s3File); + // db.Close() +} + +func (e RedshiftEmitter) generateCopyStatement(s3File string) string { + var b bytes.Buffer + b.WriteString(fmt.Sprintf("COPY %v ", e.redshiftTable)) + b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) + b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) + b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) + b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.redshiftDelimiter)) + b.WriteString(";") + return b.String() } diff --git a/emitters/s3_emitter.go b/emitters/s3_emitter.go index d66c810..47279fc 100644 --- a/emitters/s3_emitter.go +++ b/emitters/s3_emitter.go @@ -6,24 +6,29 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" - "github.com/harlow/go-etl/buffers" + "github.com/harlow/go-etl/interfaces" ) type S3Emitter struct { S3Bucket string } -func (e S3Emitter) s3FileName(firstSeq string, lastSeq string) string { +func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { date := time.Now().UTC().Format("2006-01-02") return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) } -func (e S3Emitter) Emit(buffer buffers.Buffer) { +func (e S3Emitter) Emit(buffer interfaces.Buffer) { auth, _ := aws.EnvAuth() - s := s3.New(auth, aws.USEast) - b := s.Bucket(e.S3Bucket) - f := e.s3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber()) - r := b.Put(f, buffer.Data(), "text/plain", s3.Private, s3.Options{}) - fmt.Printf("Successfully emitted %v records to S3 in s3://%v/%v", buffer.NumMessagesInBuffer(), b, f) - fmt.Println(r) + s3Con := s3.New(auth, aws.USEast) + bucket := s3Con.Bucket(e.S3Bucket) + s3File := e.S3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber()) + + err := bucket.Put(s3File, buffer.Data(), "text/plain", s3.Private, s3.Options{}) + + if err != nil { + fmt.Printf("Error occured while uploding to S3: %v\n", err) + } else { + fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", buffer.NumMessagesInBuffer(), e.S3Bucket, s3File) + } } diff --git a/emitters/s3_manifest_emitter.go b/emitters/s3_manifest_emitter.go new file mode 100644 index 0000000..e26bc9f --- /dev/null +++ b/emitters/s3_manifest_emitter.go @@ -0,0 +1,30 @@ +package emitters + +import ( + "fmt" + + "github.com/harlow/go-etl/interfaces" + "github.com/sendgridlabs/go-kinesis" +) + +type S3ManifestEmitter struct { + OutputStream string + S3Bucket string + Ksis *kinesis.Kinesis +} + +func (e S3ManifestEmitter) Emit(buffer interfaces.Buffer) { + s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} + s3Emitter.Emit(buffer) + s3File := s3Emitter.S3FileName(buffer.FirstSequenceNumber(), buffer.LastSequenceNumber()) + + args := kinesis.NewArgs() + args.Add("StreamName", e.OutputStream) + args.Add("PartitionKey", s3File) + args.AddData([]byte(s3File)) + _, err := e.Ksis.PutRecord(args) + + if err != nil { + fmt.Printf("S3 Manifest Emitter Error: %v", err) + } +} diff --git a/examples/consumer.go b/examples/consumer.go deleted file mode 100644 index 2252fa1..0000000 --- a/examples/consumer.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/harlow/go-etl/checkpoints" - "github.com/harlow/go-etl/emitters" - "github.com/harlow/go-etl/utils" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" -) - -func main() { - godotenv.Load() - - s := "inputStream" - k := kinesis.New("", "", kinesis.Region{}) - c := checkpoints.RedisCheckpoint{AppName: "sampleApp"} - e := emitters.S3Emitter{S3Bucket: "bucketName"} - // t := transformers.EventTransformer{} - - args := kinesis.NewArgs() - args.Add("StreamName", s) - streamInfo, err := k.DescribeStream(args) - - if err != nil { - fmt.Printf("Unable to connect to %v stream. Aborting.", s) - return - } - - for _, shard := range streamInfo.StreamDescription.Shards { - go utils.GetRecords(k, &c, e, s, shard.ShardId) - } - - select {} -} diff --git a/examples/producer.go b/examples/producer.go deleted file mode 100644 index 7793b61..0000000 --- a/examples/producer.go +++ /dev/null @@ -1,35 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/harlow/go-etl/utils" - "github.com/joho/godotenv" - "github.com/sendgridlabs/go-kinesis" -) - -func putSampleDataOnStream(ksis *kinesis.Kinesis, streamName string, numRecords int) { - for i := 0; i < numRecords; i++ { - args := kinesis.NewArgs() - args.Add("StreamName", streamName) - args.AddData([]byte(fmt.Sprintf("Hello AWS Kinesis %d", i))) - args.Add("PartitionKey", fmt.Sprintf("partitionKey-%d", i)) - resp, err := ksis.PutRecord(args) - - if err != nil { - fmt.Printf("PutRecord err: %v\n", err) - } else { - fmt.Printf("SequenceNumber: %v\n", resp.SequenceNumber) - } - } -} - -func main() { - godotenv.Load() - streamName := "inputStream" - ksis := kinesis.New("", "", kinesis.Region{}) - - utils.CreateAndWaitForStreamToBecomeAvailable(ksis, streamName, 2) - putSampleDataOnStream(ksis, streamName, 50) - // deleteStream(ksis, streamName) -} diff --git a/interfaces/buffer.go b/interfaces/buffer.go new file mode 100644 index 0000000..8884591 --- /dev/null +++ b/interfaces/buffer.go @@ -0,0 +1,11 @@ +package interfaces + +type Buffer interface { + ConsumeRecord(data []byte, sequenceNumber string) + Data() []byte + FirstSequenceNumber() string + Flush() + LastSequenceNumber() string + NumMessagesInBuffer() int + ShouldFlush() bool +} diff --git a/interfaces/checkpoint.go b/interfaces/checkpoint.go new file mode 100644 index 0000000..8fa1a32 --- /dev/null +++ b/interfaces/checkpoint.go @@ -0,0 +1,8 @@ +package interfaces + +type Checkpoint interface { + CheckpointExists(streamName string, shardID string) bool + SequenceNumber() string + SetCheckpoint(streamName string, shardID string, sequenceNumber string) +} + diff --git a/interfaces/emitter.go b/interfaces/emitter.go new file mode 100644 index 0000000..c0b6631 --- /dev/null +++ b/interfaces/emitter.go @@ -0,0 +1,5 @@ +package interfaces + +type Emitter interface { + Emit(buffer Buffer) +} diff --git a/interfaces/transformer.go b/interfaces/transformer.go new file mode 100644 index 0000000..5e99d81 --- /dev/null +++ b/interfaces/transformer.go @@ -0,0 +1,5 @@ +package interfaces + +type Transformer interface { + Transform() string +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 0000000..c0d0ed9 --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,73 @@ +package pipeline + +import ( + "fmt" + "time" + + "github.com/harlow/go-etl/interfaces" + "github.com/sendgridlabs/go-kinesis" +) + +type Pipeline struct { + Checkpoint interfaces.Checkpoint + Emitter interfaces.Emitter + Transformer interfaces.Transformer +} + +func (p Pipeline) GetRecords(ksis *kinesis.Kinesis, buf interfaces.Buffer, streamName string, shardId string) { + args := kinesis.NewArgs() + args.Add("ShardId", shardId) + args.Add("StreamName", streamName) + + if p.Checkpoint.CheckpointExists(streamName, shardId) { + args.Add("ShardIteratorType", "AFTER_SEQUENCE_NUMBER") + args.Add("StartingSequenceNumber", p.Checkpoint.SequenceNumber()) + } else { + args.Add("ShardIteratorType", "TRIM_HORIZON") + } + + shardInfo, err := ksis.GetShardIterator(args) + + if err != nil { + fmt.Printf("Error fetching shard itterator: %v", err) + return + } + + shardIterator := shardInfo.ShardIterator + + for { + args = kinesis.NewArgs() + args.Add("ShardIterator", shardIterator) + recordSet, err := ksis.GetRecords(args) + + if len(recordSet.Records) > 0 { + for _, d := range recordSet.Records { + data, err := d.GetData() + + if err != nil { + fmt.Printf("GetData ERROR: %v\n", err) + continue + } + + // json.Unmarshal(data, &t) + // csv := []byte(t.Transform()) + buf.ConsumeRecord(data, d.SequenceNumber) + } + } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { + fmt.Printf("GetRecords ERROR: %v\n", err) + break + } else { + fmt.Printf("Sleeping: %v\n", shardId) + time.Sleep(5 * time.Second) + } + + if buf.ShouldFlush() { + fmt.Printf("Emitting shardId: %v\n", shardId) + p.Emitter.Emit(buf) + p.Checkpoint.SetCheckpoint(streamName, shardId, buf.LastSequenceNumber()) + buf.Flush() + } + + shardIterator = recordSet.NextShardIterator + } +} diff --git a/utils/config_utils.go b/utils/config_utils.go new file mode 100644 index 0000000..66f8261 --- /dev/null +++ b/utils/config_utils.go @@ -0,0 +1,81 @@ +package utils + +import ( + "bufio" + "log" + "os" + "reflect" + "regexp" + "strconv" + "strings" + "unicode" +) + +func readLines(path string) ([]string, error) { + file, err := os.Open(path) + + if err != nil { + return nil, err + } + + defer file.Close() + var lines []string + + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } + + return lines, scanner.Err() +} + +var ( + assignRegex = regexp.MustCompile(`^([^=]+)=(.*)$`) +) + +func upcaseInitial(str string) string { + for i, v := range str { + return string(unicode.ToUpper(v)) + str[i+1:] + } + + return "" +} + +func LoadConfig(config interface{}, filename string) error { + lines, err := readLines(filename) + + if err != nil { + log.Fatalf("Load error: %s", err) + } + + mutable := reflect.ValueOf(config).Elem() + + for _, line := range lines { + line = strings.TrimSpace(line) + + if len(line) == 0 || line[0] == ';' || line[0] == '#' { + continue + } + + if groups := assignRegex.FindStringSubmatch(line); groups != nil { + key, val := groups[1], groups[2] + key, val = strings.TrimSpace(key), strings.TrimSpace(val) + key = upcaseInitial(key) + fieldType := mutable.FieldByName(key).Type() + + switch fieldType.Name() { + case "int": + val, _ := strconv.ParseInt(val, 0, 64) + mutable.FieldByName(key).SetInt(val) + case "bool": + val, _ := strconv.ParseBool(val) + mutable.FieldByName(key).SetBool(val) + default: + mutable.FieldByName(key).SetString(val) + } + } + } + + return nil +}