diff --git a/README.md b/README.md index eab3ccd..859a4f2 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ func NewPipeline(cfg Config) *connector.Pipeline { StreamName: cfg.KinesisStream, } - e := connector.RedshiftEmitter{ + e := connector.RedshiftBasicEmtitter{ TableName: cfg.TableName, S3Bucket: cfg.S3Bucket, Format: cfg.Format, diff --git a/manifest.go b/manifest.go new file mode 100644 index 0000000..7e6ad47 --- /dev/null +++ b/manifest.go @@ -0,0 +1,10 @@ +package connector + +type Entry struct { + Url string `json:"url"` + Mandatory bool `json:"mandatory"` +} + +type Manifest struct { + Entries []Entry `json:"entries"` +} diff --git a/pipeline.go b/pipeline.go index 445fac0..14d4198 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,7 +1,7 @@ package connector import ( - "fmt" + "log" "time" "github.com/sendgridlabs/go-kinesis" @@ -38,8 +38,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - fmt.Printf("Error fetching shard itterator: %v", err) - return + log.Fatalf("GetShardIterator ERROR: %v\n", err) } shardIterator := shardInfo.ShardIterator @@ -50,9 +49,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { recordSet, err := ksis.GetRecords(args) if err != nil { - fmt.Printf("GetRecords ERROR: %v\n", err) - time.Sleep(10 * time.Second) - continue + log.Fatalf("GetRecords ERROR: %v\n", err) } if len(recordSet.Records) > 0 { @@ -60,7 +57,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - fmt.Printf("GetData ERROR: %v\n", err) + log.Printf("GetData ERROR: %v\n", err) continue } @@ -71,15 +68,13 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - fmt.Printf("NextShardIterator ERROR: %v\n", err) + log.Printf("NextShardIterator ERROR: %v\n", err) break } else { - fmt.Printf("Sleeping: %v\n", shardID) - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) } if p.Buffer.ShouldFlush() { - fmt.Printf("Emitting to Shard: %v\n", shardID) p.Emitter.Emit(p.Buffer, p.Transformer) p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() diff --git a/redshift_emitter.go b/redshift_basic_emitter.go similarity index 89% rename from redshift_emitter.go rename to redshift_basic_emitter.go index 7bd869b..7c48d22 100644 --- a/redshift_emitter.go +++ b/redshift_basic_emitter.go @@ -14,7 +14,7 @@ import ( // RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. // It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered // data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. -type RedshiftEmitter struct { +type RedshiftBasicEmtitter struct { Delimiter string Format string Jsonpaths string @@ -24,7 +24,7 @@ type RedshiftEmitter struct { // Emit is invoked when the buffer is full. This method leverages the S3Emitter and // then issues a copy command to Redshift data store. -func (e RedshiftEmitter) Emit(b Buffer, t Transformer) { +func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) @@ -45,13 +45,12 @@ func (e RedshiftEmitter) Emit(b Buffer, t Transformer) { } // Creates the SQL copy statement issued to Redshift cluster. -func (e RedshiftEmitter) copyStatement(s3File string) string { - var b bytes.Buffer +func (e RedshiftBasicEmtitter) copyStatement(s3File string) string { + b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) - switch e.Format { case "json": b.WriteString(fmt.Sprintf("json 'auto'")) @@ -60,7 +59,6 @@ func (e RedshiftEmitter) copyStatement(s3File string) string { default: b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } - b.WriteString(";") return b.String() } diff --git a/redshift_emitter_test.go b/redshift_basic_emitter_test.go similarity index 93% rename from redshift_emitter_test.go rename to redshift_basic_emitter_test.go index 030515d..7d993a1 100644 --- a/redshift_emitter_test.go +++ b/redshift_basic_emitter_test.go @@ -5,7 +5,7 @@ import ( ) func TestCopyStatement(t *testing.T) { - e := RedshiftEmitter{ + e := RedshiftBasicEmtitter{ Delimiter: ",", S3Bucket: "test_bucket", TableName: "test_table", diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go new file mode 100644 index 0000000..36f0137 --- /dev/null +++ b/redshift_manifest_emitter.go @@ -0,0 +1,149 @@ +package connector + +import ( + "bytes" + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/crowdmob/goamz/aws" + "github.com/crowdmob/goamz/s3" + _ "github.com/lib/pq" +) + +// An implementation of Emitter that reads S3 file paths from a stream, creates a +// manifest file and batch copies them into Redshift. +type RedshiftManifestEmitter struct { + AccessKey string + CopyMandatory bool + DataTable string + Delimiter string + FileTable string + Format string + Jsonpaths string + S3Bucket string + SecretKey string +} + +// Invoked when the buffer is full. +// Emits a Manifest file to S3 and then performs the Redshift copy command. +func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { + db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) + + if err != nil { + log.Fatal(err) + } + + // Aggregate file paths as strings + files := []string{} + for _, r := range b.Records() { + f := t.FromRecord(r) + files = append(files, string(f)) + } + + // Manifest file name + date := time.Now().UTC().Format("2006/01/02") + manifestFileName := e.getManifestName(date, files) + + // Issue manifest COPY to Redshift + e.writeManifestToS3(files, manifestFileName) + c := e.copyStmt(manifestFileName) + _, err = db.Exec(c) + + if err != nil { + log.Fatal(err) + } + + // Insert file paths into File Names table + i := e.fileInsertStmt(files) + _, err = db.Exec(i) + + if err != nil { + log.Fatal(err) + } + + log.Printf("[%v] copied to Redshift", manifestFileName) + db.Close() +} + +// Creates the INSERT statement for the file names database table. +func (e RedshiftManifestEmitter) fileInsertStmt(fileNames []string) string { + i := new(bytes.Buffer) + i.WriteString("('") + i.WriteString(strings.Join(fileNames, "'),('")) + i.WriteString("')") + + b := new(bytes.Buffer) + b.WriteString("INSERT INTO ") + b.WriteString(e.FileTable) + b.WriteString(" VALUES ") + b.WriteString(i.String()) + b.WriteString(";") + + return b.String() +} + +// Creates the COPY statment for Redshift insertion. +func (e RedshiftManifestEmitter) copyStmt(filePath string) string { + b := new(bytes.Buffer) + c := fmt.Sprintf( + "CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' ", + os.Getenv("AWS_ACCESS_KEY"), + os.Getenv("AWS_SECRET_KEY"), + ) + b.WriteString("COPY " + e.DataTable + " ") + b.WriteString("FROM 's3://" + e.S3Bucket + "/" + filePath + "' ") + b.WriteString(c) + switch e.Format { + case "json": + b.WriteString(fmt.Sprintf("json 'auto' ")) + case "jsonpaths": + b.WriteString(fmt.Sprintf("json '%s' ", e.Jsonpaths)) + default: + b.WriteString(fmt.Sprintf("DELIMITER '%s' ", e.Delimiter)) + } + b.WriteString("MANIFEST") + b.WriteString(";") + return b.String() +} + +// Put the Manifest file contents to Redshift +func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileName string) { + auth, _ := aws.EnvAuth() + s3Con := s3.New(auth, aws.USEast) + bucket := s3Con.Bucket(e.S3Bucket) + content := e.generateManifestFile(files) + err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) + if err != nil { + fmt.Printf("Error occured while uploding to S3: %v\n", err) + } +} + +// Manifest file name based on First and Last sequence numbers +func (e RedshiftManifestEmitter) getManifestName(date string, files []string) string { + firstSeq := e.getSeq(files[0]) + lastSeq := e.getSeq(files[len(files)-1]) + return fmt.Sprintf("%v/_manifest/%v_%v", date, firstSeq, lastSeq) +} + +// Trims the date and suffix information from string +func (e RedshiftManifestEmitter) getSeq(file string) string { + matches := strings.Split(file, "/") + return matches[len(matches)-1] +} + +// Manifest file contents in JSON structure +func (e RedshiftManifestEmitter) generateManifestFile(files []string) []byte { + m := &Manifest{} + for _, r := range files { + var url = fmt.Sprintf("s3://%s/%s", e.S3Bucket, r) + var entry = Entry{Url: url, Mandatory: e.CopyMandatory} + m.Entries = append(m.Entries, entry) + } + b, _ := json.Marshal(m) + return b +} diff --git a/redshift_manifest_emitter_test.go b/redshift_manifest_emitter_test.go new file mode 100644 index 0000000..802af6d --- /dev/null +++ b/redshift_manifest_emitter_test.go @@ -0,0 +1,39 @@ +package connector + +import "testing" + +func TestInsertStmt(t *testing.T) { + e := RedshiftManifestEmitter{FileTable: "funz"} + s := []string{"file1", "file2"} + + expected := "INSERT INTO funz VALUES ('file1'),('file2');" + result := e.fileInsertStmt(s) + + if result != expected { + t.Errorf("fileInsertStmt() = %v want %v", result, expected) + } +} + +func TestManifestName(t *testing.T) { + e := RedshiftManifestEmitter{} + s := []string{"2014/01/01/a-b", "2014/01/01/c-d"} + + expected := "2000/01/01/_manifest/a-b_c-d" + result := e.getManifestName("2000/01/01", s) + + if result != expected { + t.Errorf("getManifestName() = %v want %v", result, expected) + } +} + +func TestGenerateManifestFile(t *testing.T) { + e := RedshiftManifestEmitter{S3Bucket: "bucket_name", CopyMandatory: true} + s := []string{"file1"} + + expected := "{\"entries\":[{\"url\":\"s3://bucket_name/file1\",\"mandatory\":true}]}" + result := string(e.generateManifestFile(s)) + + if result != expected { + t.Errorf("generateManifestFile() = %v want %v", result, expected) + } +} diff --git a/s3_emitter.go b/s3_emitter.go index 6b81aa4..69a29c6 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,6 +3,7 @@ package connector import ( "bytes" "fmt" + "log" "time" "github.com/crowdmob/goamz/aws" @@ -22,8 +23,8 @@ type S3Emitter struct { // S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current // UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { - date := time.Now().UTC().Format("2006-01-02") - return fmt.Sprintf("/%v/%v-%v.txt", date, firstSeq, lastSeq) + date := time.Now().UTC().Format("2006/01/02") + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) } // Emit is invoked when the buffer is full. This method emits the set of filtered records. @@ -43,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - fmt.Printf("Error occured while uploding to S3: %v\n", err) + log.Printf("S3Put ERROR: %v\n", err) } else { - fmt.Printf("Emitted %v records to S3 in s3://%v%v\n", b.NumRecordsInBuffer(), e.S3Bucket, s3File) + log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) } } diff --git a/s3_emitter_test.go b/s3_emitter_test.go index 981412f..1307dd0 100644 --- a/s3_emitter_test.go +++ b/s3_emitter_test.go @@ -7,12 +7,13 @@ import ( ) func TestS3FileName(t *testing.T) { - d := time.Now().UTC().Format("2006-01-02") - n := fmt.Sprintf("/%v/a-b.txt", d) + d := time.Now().UTC().Format("2006/01/02") e := S3Emitter{} - f := e.S3FileName("a", "b") - if f != n { - t.Errorf("S3FileName() = want %v", f, n) + expected := fmt.Sprintf("%v/a-b", d) + result := e.S3FileName("a", "b") + + if result != expected { + t.Errorf("S3FileName() = %v want %v", result, expected) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go new file mode 100644 index 0000000..756d760 --- /dev/null +++ b/s3_manifest_emitter.go @@ -0,0 +1,37 @@ +package connector + +import ( + "log" + + "github.com/sendgridlabs/go-kinesis" +) + +// An implementation of Emitter that puts event data on S3 file, and then puts the +// S3 file path onto the output stream for processing by manifest application. +type S3ManifestEmitter struct { + OutputStream string + S3Bucket string + Ksis *kinesis.Kinesis +} + +func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { + + // Emit buffer contents to S3 Bucket + s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} + s3Emitter.Emit(b, t) + s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) + + // Emit the file path to Kinesis Output stream + args := kinesis.NewArgs() + args.Add("StreamName", e.OutputStream) + args.Add("PartitionKey", s3File) + args.AddData([]byte(s3File)) + + _, err := e.Ksis.PutRecord(args) + + if err != nil { + log.Printf("PutRecord ERROR: %v", err) + } else { + log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + } +}