diff --git a/README.md b/README.md index 6d5dbd6..0aa60fe 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ func main() { Checkpoint: &c, Emitter: &e, Filter: &f, + Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), StreamName: cfg.Kinesis.InputStream, Transformer: &t, } @@ -144,6 +145,7 @@ func main() { Checkpoint: &c, Emitter: &e, Filter: &f, + Logger: log.New(os.Stdout, "KINESIS: ", log.Ldate|log.Ltime|log.Lshortfile), StreamName: cfg.Kinesis.OutputStream, Transformer: &t, } diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..916807f --- /dev/null +++ b/logger.go @@ -0,0 +1,8 @@ +package connector + +// Logger sends pipeline info and errors to logging endpoint. The logger could be +// used to send to STDOUT, Syslog, or any number of distributed log collecting platforms. +type Logger interface { + Fatalf(format string, v ...interface{}) + Printf(format string, v ...interface{}) +} diff --git a/pipeline.go b/pipeline.go index 14d4198..d0d7911 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,7 +1,6 @@ package connector import ( - "log" "time" "github.com/sendgridlabs/go-kinesis" @@ -17,6 +16,7 @@ type Pipeline struct { Checkpoint Checkpoint Emitter Emitter Filter Filter + Logger Logger StreamName string Transformer Transformer } @@ -38,7 +38,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - log.Fatalf("GetShardIterator ERROR: %v\n", err) + p.Logger.Fatalf("GetShardIterator ERROR: %v\n", err) } shardIterator := shardInfo.ShardIterator @@ -49,7 +49,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { recordSet, err := ksis.GetRecords(args) if err != nil { - log.Fatalf("GetRecords ERROR: %v\n", err) + p.Logger.Fatalf("GetRecords ERROR: %v\n", err) } if len(recordSet.Records) > 0 { @@ -57,7 +57,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - log.Printf("GetData ERROR: %v\n", err) + p.Logger.Printf("GetData ERROR: %v\n", err) continue } @@ -68,7 +68,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - log.Printf("NextShardIterator ERROR: %v\n", err) + p.Logger.Printf("NextShardIterator ERROR: %v\n", err) break } else { time.Sleep(5 * time.Second) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index 7c48d22..e61c3c9 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -4,7 +4,6 @@ import ( "bytes" "database/sql" "fmt" - "log" "os" // Postgres package is used when sql.Open is called @@ -14,38 +13,39 @@ import ( // RedshiftEmitter is an implementation of Emitter that buffered batches of records into Redshift one by one. // It first emits records into S3 and then perfors the Redshift JSON COPY command. S3 storage of buffered // data achieved using the S3Emitter. A link to jsonpaths must be provided when configuring the struct. -type RedshiftBasicEmtitter struct { +type RedshiftBasicEmitter struct { Delimiter string Format string Jsonpaths string + Logger Logger S3Bucket string TableName string } // Emit is invoked when the buffer is full. This method leverages the S3Emitter and // then issues a copy command to Redshift data store. -func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { +func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { s3Emitter := S3Emitter{S3Bucket: e.S3Bucket} s3Emitter.Emit(b, t) s3File := s3Emitter.S3FileName(b.FirstSequenceNumber(), b.LastSequenceNumber()) db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + e.Logger.Fatalf("sql.Open ERROR: %v\n", err) } _, err = db.Exec(e.copyStatement(s3File)) if err != nil { - log.Fatal(err) + e.Logger.Fatalf("db.Exec ERROR: %v\n", err) } - fmt.Printf("Redshift load completed.\n") + e.Logger.Printf("Redshift load completed.\n") db.Close() } // Creates the SQL copy statement issued to Redshift cluster. -func (e RedshiftBasicEmtitter) copyStatement(s3File string) string { +func (e RedshiftBasicEmitter) copyStatement(s3File string) string { b := new(bytes.Buffer) b.WriteString(fmt.Sprintf("COPY %v ", e.TableName)) b.WriteString(fmt.Sprintf("FROM 's3://%v%v' ", e.S3Bucket, s3File)) diff --git a/redshift_basic_emitter_test.go b/redshift_basic_emitter_test.go index 7d993a1..f5f790d 100644 --- a/redshift_basic_emitter_test.go +++ b/redshift_basic_emitter_test.go @@ -5,7 +5,7 @@ import ( ) func TestCopyStatement(t *testing.T) { - e := RedshiftBasicEmtitter{ + e := RedshiftBasicEmitter{ Delimiter: ",", S3Bucket: "test_bucket", TableName: "test_table", diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 36f0137..ed585f2 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -5,7 +5,6 @@ import ( "database/sql" "encoding/json" "fmt" - "log" "os" "strings" "time" @@ -25,6 +24,7 @@ type RedshiftManifestEmitter struct { FileTable string Format string Jsonpaths string + Logger Logger S3Bucket string SecretKey string } @@ -35,7 +35,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + e.Logger.Fatalf("sql.Open ERROR: %v\n", err) } // Aggregate file paths as strings @@ -55,7 +55,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - log.Fatal(err) + e.Logger.Fatalf("db.Exec ERROR: %v\n", err) } // Insert file paths into File Names table @@ -63,10 +63,10 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - log.Fatal(err) + e.Logger.Fatalf("db.Exec ERROR: %v\n", err) } - log.Printf("[%v] copied to Redshift", manifestFileName) + e.Logger.Printf("[%v] copied to Redshift", manifestFileName) db.Close() } diff --git a/s3_emitter.go b/s3_emitter.go index 69a29c6..e141c54 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,7 +3,6 @@ package connector import ( "bytes" "fmt" - "log" "time" "github.com/crowdmob/goamz/aws" @@ -17,6 +16,7 @@ import ( // from the first and last sequence numbers of the records contained in that file separated by a // dash. This struct requires the configuration of an S3 bucket and endpoint. type S3Emitter struct { + Logger Logger S3Bucket string } @@ -44,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - log.Printf("S3Put ERROR: %v\n", err) + e.Logger.Fatalf("S3Put ERROR: %v\n", err.Error()) } else { - log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) + e.Logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 756d760..59aa565 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,8 +1,6 @@ package connector import ( - "log" - "github.com/sendgridlabs/go-kinesis" ) @@ -12,6 +10,7 @@ type S3ManifestEmitter struct { OutputStream string S3Bucket string Ksis *kinesis.Kinesis + Logger Logger } func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { @@ -30,8 +29,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { _, err := e.Ksis.PutRecord(args) if err != nil { - log.Printf("PutRecord ERROR: %v", err) + e.Logger.Printf("PutRecord ERROR: %v", err) } else { - log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + e.Logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) } }