diff --git a/awsbackoff_test.go b/awsbackoff_test.go index 34843c2..5f52a2e 100644 --- a/awsbackoff_test.go +++ b/awsbackoff_test.go @@ -2,8 +2,6 @@ package connector import ( "fmt" - "io/ioutil" - "log" "net" "testing" @@ -12,8 +10,6 @@ import ( ) func Test_isRecoverableError(t *testing.T) { - log.SetOutput(ioutil.Discard) - testCases := []struct { err error isRecoverable bool diff --git a/kinesis.go b/kinesis.go index 77f0644..bbd8c00 100644 --- a/kinesis.go +++ b/kinesis.go @@ -1,7 +1,6 @@ package connector import ( - "fmt" "time" "github.com/sendgridlabs/go-kinesis" @@ -14,7 +13,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { err := k.CreateStream(streamName, shardCount) if err != nil { - fmt.Printf("CreateStream ERROR: %v\n", err) + logger.Printf("CreateStream ERROR: %v", err) return } } @@ -27,7 +26,7 @@ func CreateStream(k *kinesis.Kinesis, streamName string, shardCount int) { args.Add("StreamName", streamName) resp, _ = k.DescribeStream(args) streamStatus := resp.StreamDescription.StreamStatus - fmt.Printf("Stream [%v] is %v\n", streamName, streamStatus) + logger.Printf("Stream [%v] is %v", streamName, streamStatus) if streamStatus != "ACTIVE" { time.Sleep(4 * time.Second) @@ -43,7 +42,7 @@ func StreamExists(k *kinesis.Kinesis, streamName string) bool { args := kinesis.NewArgs() resp, err := k.ListStreams(args) if err != nil { - fmt.Printf("ListStream ERROR: %v\n", err) + logger.Printf("ListStream ERROR: %v", err) return false } for _, s := range resp.StreamNames { diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index f0a9bed..ac9f4fd 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -18,7 +18,9 @@ type RedshiftBasicEmitter struct { Format string Jsonpaths string S3Bucket string + S3Prefix string TableName string + Db *sql.DB } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and @@ -27,40 +29,34 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) - db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) - if err != nil { - logger.Fatalf("sql.Open ERROR: %v\n", err) - } + stmt := e.copyStatement(s3File) for i := 0; i < 10; i++ { - // handle aws backoff, this may be necessary if, for example, the - // s3 file has not appeared to the database yet - handleAwsWaitTimeExp(i) + // execute copy statement + _, err := e.Db.Exec(stmt) - // load S3File into database - _, err = db.Exec(e.copyStatement(s3File)) - - // if the request succeeded, or its an unrecoverable error, break out of - // the loop because we are done + // if the request succeeded, or its an unrecoverable error, break out of loop if err == nil || isRecoverableError(err) == false { break } - logger.Fatalf("db.Exec ERROR: %v\n", err) + // handle aws backoff, this may be necessary if, for example, the + // s3 file has not appeared to the database yet + handleAwsWaitTimeExp(i) } logger.Printf("Redshift load completed.\n") - db.Close() } // Creates the SQL copy statement issued to Redshift cluster. func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) - b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) + b.WriteString(fmt.Sprintf("FROM 's3://%v/%v' ", e.S3Bucket, s3File)) b.WriteString(fmt.Sprintf("CREDENTIALS 'aws_access_key_id=%v;", os.Getenv("AWS_ACCESS_KEY"))) b.WriteString(fmt.Sprintf("aws_secret_access_key=%v' ", os.Getenv("AWS_SECRET_KEY"))) + switch e.Format { case "json": b.WriteString(fmt.Sprintf("json 'auto'")) @@ -70,5 +66,6 @@ func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b.WriteString(fmt.Sprintf("DELIMITER '%v'", e.Delimiter)) } b.WriteString(";") + return b.String() } diff --git a/redshift_basic_emitter_test.go b/redshift_basic_emitter_test.go index f5f790d..4689be6 100644 --- a/redshift_basic_emitter_test.go +++ b/redshift_basic_emitter_test.go @@ -10,7 +10,7 @@ func TestCopyStatement(t *testing.T) { S3Bucket: "test_bucket", TableName: "test_table", } - f := e.copyStatement("/test.txt") + f := e.copyStatement("test.txt") copyStatement := "COPY test_table FROM 's3://test_bucket/test.txt' CREDENTIALS 'aws_access_key_id=;aws_secret_access_key=' DELIMITER ',';" diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 78cfaa4..07a6ef8 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -118,7 +118,7 @@ func (e RedshiftManifestEmitter) writeManifestToS3(files []string, manifestFileN content := e.generateManifestFile(files) err := bucket.Put(manifestFileName, content, "text/plain", s3.Private, s3.Options{}) if err != nil { - fmt.Printf("Error occured while uploding to S3: %v\n", err) + logger.Printf("Error occured while uploding to S3: %v", err) } } diff --git a/s3_emitter.go b/s3_emitter.go index 8666f0d..4cf91fa 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -17,13 +17,18 @@ import ( // dash. This struct requires the configuration of an S3 bucket and endpoint. type S3Emitter struct { S3Bucket string + S3Prefix string } // S3FileName generates a file name based on the First and Last sequence numbers from the buffer. The current // UTC date (YYYY-MM-DD) is base of the path to logically group days of batches. func (e S3Emitter) S3FileName(firstSeq string, lastSeq string) string { date := time.Now().UTC().Format("2006/01/02") - return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + if e.S3Prefix == "" { + return fmt.Sprintf("%v/%v-%v", date, firstSeq, lastSeq) + } else { + return fmt.Sprintf("%v/%v/%v-%v", e.S3Prefix, date, firstSeq, lastSeq) + } } // Emit is invoked when the buffer is full. This method emits the set of filtered records. diff --git a/s3_emitter_test.go b/s3_emitter_test.go index 1307dd0..8e9229e 100644 --- a/s3_emitter_test.go +++ b/s3_emitter_test.go @@ -8,12 +8,20 @@ import ( func TestS3FileName(t *testing.T) { d := time.Now().UTC().Format("2006/01/02") - e := S3Emitter{} + e := S3Emitter{S3Bucket: "bucket", S3Prefix: "prefix"} - expected := fmt.Sprintf("%v/a-b", d) + expected := fmt.Sprintf("prefix/%v/a-b", d) result := e.S3FileName("a", "b") if result != expected { t.Errorf("S3FileName() = %v want %v", result, expected) } + + e.S3Prefix = "" + expected = fmt.Sprintf("%v/a-b", d) + result = e.S3FileName("a", "b") + + if result != expected { + t.Errorf("S3FileName() = %v want %v", result, expected) + } }