diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 2026893..6c54dde 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -20,7 +20,9 @@ type RedshiftBasicEmtitter struct { Format string Jsonpaths string S3Bucket string + S3Prefix string TableName string + Db *sql.DB } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and @@ -29,20 +31,14 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) - if err != nil { - log.Fatal(err) - } - - _, err = db.Exec(e.copyStatement(s3File)) + _, err := e.Db.Exec(e.copyStatement(s3File)) if err != nil { log.Fatal(err) } l4g.Debug("Redshift load completed.") - db.Close() } // Creates the SQL copy statement issued to Redshift cluster. @@ -61,5 +57,6 @@ func (e RedshiftBasicEmtitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } b.WriteString(";") + l4g.Debug(b.String()) return b.String() }