re-use the database connection to redshift

This commit is contained in:
dan 2015-04-06 14:51:46 -07:00
parent e3efa383c1
commit 9ed761edc6

View file

@ -20,7 +20,9 @@ type RedshiftBasicEmtitter struct {
Format string Format string
Jsonpaths string Jsonpaths string
S3Bucket string S3Bucket string
S3Prefix string
TableName string TableName string
Db *sql.DB
} }
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and // Emit is invoked when the buffer is full. This method leverages the S3Emitter and
@ -29,20 +31,14 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) {
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
s3Emitter.Emit(b, t) s3Emitter.Emit(b, t)
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
if err != nil { _, err := e.Db.Exec(e.copyStatement(s3File))
log.Fatal(err)
}
_, err = db.Exec(e.copyStatement(s3File))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
l4g.Debug("Redshift load completed.") l4g.Debug("Redshift load completed.")
db.Close()
} }
// Creates the SQL copy statement issued to Redshift cluster. // Creates the SQL copy statement issued to Redshift cluster.
@ -61,5 +57,6 @@ func (e RedshiftBasicEmtitter) copyStatement(s3File string) string {
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
} }
b.WriteString(";") b.WriteString(";")
l4g.Debug(b.String())
return b.String() return b.String()
} }