diff --git a/default_logger.go b/default_logger.go new file mode 100644 index 0000000..2679cc4 --- /dev/null +++ b/default_logger.go @@ -0,0 +1,26 @@ +package connector + +import "log" + +// the default implementation for a logger +// for this package. +type DefaultLogger struct { +} + +func (l *DefaultLogger) Fatalf(format string, v ...interface{}) { + log.Fatalf(format, v...) +} +func (l *DefaultLogger) Printf(format string, v ...interface{}) { + log.Printf(format, v...) +} + +// make sure that there is a default logger instance +// initialized, so that we don't end up with panics +var logger Logger = &DefaultLogger{} + +// expose the ability to change the logger so that +// external packages can control the logging for +// this package +func SetLogger(l Logger) { + logger = l +} diff --git a/pipeline.go b/pipeline.go index d0d7911..7156065 100644 --- a/pipeline.go +++ b/pipeline.go @@ -16,7 +16,6 @@ type Pipeline struct { Checkpoint Checkpoint Emitter Emitter Filter Filter - Logger Logger StreamName string Transformer Transformer } @@ -38,7 +37,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - p.Logger.Fatalf("GetShardIterator ERROR: %v\n", err) + logger.Fatalf("GetShardIterator ERROR: %v\n", err) } shardIterator := shardInfo.ShardIterator @@ -49,7 +48,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { recordSet, err := ksis.GetRecords(args) if err != nil { - p.Logger.Fatalf("GetRecords ERROR: %v\n", err) + logger.Fatalf("GetRecords ERROR: %v\n", err) } if len(recordSet.Records) > 0 { @@ -57,7 +56,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { data := v.GetData() if err != nil { - p.Logger.Printf("GetData ERROR: %v\n", err) + logger.Printf("GetData ERROR: %v\n", err) continue } @@ -68,7 +67,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - p.Logger.Printf("NextShardIterator ERROR: %v\n", err) + logger.Printf("NextShardIterator ERROR: %v\n", err) break } else { time.Sleep(5 * time.Second) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index e61c3c9..09caa70 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -17,7 +17,6 @@ type RedshiftBasicEmitter struct { Delimiter string Format string Jsonpaths string - Logger Logger S3Bucket string TableName string } @@ -31,16 +30,16 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - e.Logger.Fatalf("sql.Open ERROR: %v\n", err) + logger.Fatalf("sql.Open ERROR: %v\n", err) } _, err = db.Exec(e.copyStatement(s3File)) if err != nil { - e.Logger.Fatalf("db.Exec ERROR: %v\n", err) + logger.Fatalf("db.Exec ERROR: %v\n", err) } - e.Logger.Printf("Redshift load completed.\n") + logger.Printf("Redshift load completed.\n") db.Close() } diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index ed585f2..78cfaa4 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -24,7 +24,6 @@ type RedshiftManifestEmitter struct { FileTable string Format string Jsonpaths string - Logger Logger S3Bucket string SecretKey string } @@ -35,7 +34,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - e.Logger.Fatalf("sql.Open ERROR: %v\n", err) + logger.Fatalf("sql.Open ERROR: %v\n", err) } // Aggregate file paths as strings @@ -55,7 +54,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - e.Logger.Fatalf("db.Exec ERROR: %v\n", err) + logger.Fatalf("db.Exec ERROR: %v\n", err) } // Insert file paths into File Names table @@ -63,10 +62,10 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - e.Logger.Fatalf("db.Exec ERROR: %v\n", err) + logger.Fatalf("db.Exec ERROR: %v\n", err) } - e.Logger.Printf("[%v] copied to Redshift", manifestFileName) + logger.Printf("[%v] copied to Redshift", manifestFileName) db.Close() } diff --git a/s3_emitter.go b/s3_emitter.go index e141c54..8666f0d 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -16,7 +16,6 @@ import ( // from the first and last sequence numbers of the records contained in that file separated by a // dash. This struct requires the configuration of an S3 bucket and endpoint. type S3Emitter struct { - Logger Logger S3Bucket string } @@ -44,8 +43,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - e.Logger.Fatalf("S3Put ERROR: %v\n", err.Error()) + logger.Fatalf("S3Put ERROR: %v\n", err.Error()) } else { - e.Logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) + logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 59aa565..88b3d06 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -10,7 +10,6 @@ type S3ManifestEmitter struct { OutputStream string S3Bucket string Ksis *kinesis.Kinesis - Logger Logger } func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { @@ -29,8 +28,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { _, err := e.Ksis.PutRecord(args) if err != nil { - e.Logger.Printf("PutRecord ERROR: %v", err) + logger.Printf("PutRecord ERROR: %v", err) } else { - e.Logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) } }