Logging and DB connection reuse
* Reuse Redshift DB connection * Add more logging
This commit is contained in:
parent
a27a13f405
commit
9371fb938c
7 changed files with 33 additions and 28 deletions
|
|
@ -2,8 +2,6 @@ package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
|
@ -12,8 +10,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_isRecoverableError(t *testing.T) {
|
func Test_isRecoverableError(t *testing.T) {
|
||||||
log.SetOutput(ioutil.Discard)
|
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
err error
|
err error
|
||||||
isRecoverable bool
|
isRecoverable bool
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sendgridlabs/go-kinesis"
|
"github.com/sendgridlabs/go-kinesis"
|
||||||
|
|
@ -14,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
err := k.CreateStream(streamName, shardCount)
|
err := k.CreateStream(streamName, shardCount)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("CreateStream ERROR: %v\n", err)
|
logger.Printf("CreateStream ERROR: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) {
|
||||||
args.Add("StreamName", streamName)
|
args.Add("StreamName", streamName)
|
||||||
resp, _ = k.DescribeStream(args)
|
resp, _ = k.DescribeStream(args)
|
||||||
streamStatus := resp.StreamDescription.StreamStatus
|
streamStatus := resp.StreamDescription.StreamStatus
|
||||||
fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus)
|
logger.Printf("Stream [%v] is %v", streamName, streamStatus)
|
||||||
|
|
||||||
if streamStatus != "ACTIVE" {
|
if streamStatus != "ACTIVE" {
|
||||||
time.Sleep(4 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
|
|
@ -43,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool {
|
||||||
args := kinesis.NewArgs()
|
args := kinesis.NewArgs()
|
||||||
resp, err := k.ListStreams(args)
|
resp, err := k.ListStreams(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("ListStream ERROR: %v\n", err)
|
logger.Printf("ListStream ERROR: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, s := range resp.StreamNames {
|
for _, s := range resp.StreamNames {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ type RedshiftBasicEmitter struct {
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
|
S3Prefix string
|
||||||
TableName string
|
TableName string
|
||||||
|
Db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
// Emit is invoked when the buffer is full. This method leverages the S3Emitter and
|
||||||
|
|
@ -27,40 +29,34 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
s3Emitter := S3Emitter{S3Bucket: e.S3Bucket}
|
||||||
s3Emitter.Emit(b, t)
|
s3Emitter.Emit(b, t)
|
||||||
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber())
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
|
||||||
|
|
||||||
if err != nil {
|
stmt := e.copyStatement(s3File)
|
||||||
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// handle aws backoff, this may be necessary if, for example, the
|
// execute copy statement
|
||||||
// s3 file has not appeared to the database yet
|
_, err := e.Db.Exec(stmt)
|
||||||
handleAwsWaitTimeExp(i)
|
|
||||||
|
|
||||||
// load S3File into database
|
// if the request succeeded, or its an unrecoverable error, break out of loop
|
||||||
_, err = db.Exec(e.copyStatement(s3File))
|
|
||||||
|
|
||||||
// if the request succeeded, or its an unrecoverable error, break out of
|
|
||||||
// the loop because we are done
|
|
||||||
if err == nil || isRecoverableError(err) == false {
|
if err == nil || isRecoverableError(err) == false {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
// handle aws backoff, this may be necessary if, for example, the
|
||||||
|
// s3 file has not appeared to the database yet
|
||||||
|
handleAwsWaitTimeExp(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Printf("Redshift load completed.\n")
|
logger.Printf("Redshift load completed.\n")
|
||||||
db.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates the SQL copy statement issued to Redshift cluster.
|
// Creates the SQL copy statement issued to Redshift cluster.
|
||||||
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b := new(bytes.Buffer)
|
b := new(bytes.Buffer)
|
||||||
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
b.WriteString(fmt.Sprintf("COPY %v ", e.TableName))
|
||||||
b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File))
|
b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File))
|
||||||
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY")))
|
||||||
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY")))
|
||||||
|
|
||||||
switch e.Format {
|
switch e.Format {
|
||||||
case "json":
|
case "json":
|
||||||
b.WriteString(fmt.Sprintf("json 'auto'"))
|
b.WriteString(fmt.Sprintf("json 'auto'"))
|
||||||
|
|
@ -70,5 +66,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string {
|
||||||
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter))
|
||||||
}
|
}
|
||||||
b.WriteString(";")
|
b.WriteString(";")
|
||||||
|
|
||||||
return b.String()
|
return b.String()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ func TestCopyStatement(t *testing.T) {
|
||||||
S3Bucket: "test_bucket",
|
S3Bucket: "test_bucket",
|
||||||
TableName: "test_table",
|
TableName: "test_table",
|
||||||
}
|
}
|
||||||
f := e.copyStatement("/test.txt")
|
f := e.copyStatement("test.txt")
|
||||||
|
|
||||||
copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';"
|
copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN
|
||||||
content := e.generateManifestFile(files)
|
content := e.generateManifestFile(files)
|
||||||
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error occured while uploding to S3: %v\n", err)
|
logger.Printf("Error occured while uploding to S3: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,13 +17,18 @@ import (
|
||||||
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
|
S3Prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
// S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current
|
||||||
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
// UTC date (YYYY-MM-DD) is base of the path to logically group days of batches.
|
||||||
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string {
|
||||||
date := time.Now().UTC().Format("2006/01/02")
|
date := time.Now().UTC().Format("2006/01/02")
|
||||||
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
if e.S3Prefix == "" {
|
||||||
|
return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
// Emit is invoked when the buffer is full. This method emits the set of filtered records.
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,20 @@ import (
|
||||||
|
|
||||||
func TestS3FileName(t *testing.T) {
|
func TestS3FileName(t *testing.T) {
|
||||||
d := time.Now().UTC().Format("2006/01/02")
|
d := time.Now().UTC().Format("2006/01/02")
|
||||||
e := S3Emitter{}
|
e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"}
|
||||||
|
|
||||||
expected := fmt.Sprintf("%v/a-b", d)
|
expected := fmt.Sprintf("prefix/%v/a-b", d)
|
||||||
result := e.S3FileName("a", "b")
|
result := e.S3FileName("a", "b")
|
||||||
|
|
||||||
if result != expected {
|
if result != expected {
|
||||||
t.Errorf("S3FileName() = %v want %v", result, expected)
|
t.Errorf("S3FileName() = %v want %v", result, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.S3Prefix = ""
|
||||||
|
expected = fmt.Sprintf("%v/a-b", d)
|
||||||
|
result = e.S3FileName("a", "b")
|
||||||
|
|
||||||
|
if result != expected {
|
||||||
|
t.Errorf("S3FileName() = %v want %v", result, expected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue