Add default logger
Requiring the Logger be passed in to each of the structs was causing Panics if the Logger was omitted. * Add function for overriding the default logger * Remove panics caused by initialization
This commit is contained in:
parent
82d10ab78d
commit
ff1cff0293
6 changed files with 41 additions and 20 deletions
26
default_logger.go
Normal file
26
default_logger.go
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
package connector
|
||||||
|
|
||||||
|
import "log"
|
||||||
|
|
||||||
|
// the default implementation for a logger
|
||||||
|
// for this package.
|
||||||
|
type DefaultLogger struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DefaultLogger) Fatalf(format string, v ...interface{}) {
|
||||||
|
log.Fatalf(format, v...)
|
||||||
|
}
|
||||||
|
func (l *DefaultLogger) Printf(format string, v ...interface{}) {
|
||||||
|
log.Printf(format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure that there is a default logger instance
|
||||||
|
// initialized, so that we don't end up with panics
|
||||||
|
var logger Logger = &DefaultLogger{}
|
||||||
|
|
||||||
|
// expose the ability to change the logger so that
|
||||||
|
// external packages can control the logging for
|
||||||
|
// this package
|
||||||
|
func SetLogger(l Logger) {
|
||||||
|
logger = l
|
||||||
|
}
|
||||||
|
|
@ -16,7 +16,6 @@ type Pipeline struct {
|
||||||
Checkpoint Checkpoint
|
Checkpoint Checkpoint
|
||||||
Emitter Emitter
|
Emitter Emitter
|
||||||
Filter Filter
|
Filter Filter
|
||||||
Logger Logger
|
|
||||||
StreamName string
|
StreamName string
|
||||||
Transformer Transformer
|
Transformer Transformer
|
||||||
}
|
}
|
||||||
|
|
@ -38,7 +37,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
shardInfo, err := ksis.GetShardIterator(args)
|
shardInfo, err := ksis.GetShardIterator(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.Logger.Fatalf("GetShardIterator ERROR: %v\n", err)
|
logger.Fatalf("GetShardIterator ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator := shardInfo.ShardIterator
|
shardIterator := shardInfo.ShardIterator
|
||||||
|
|
@ -49,7 +48,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
recordSet, err := ksis.GetRecords(args)
|
recordSet, err := ksis.GetRecords(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.Logger.Fatalf("GetRecords ERROR: %v\n", err)
|
logger.Fatalf("GetRecords ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(recordSet.Records) > 0 {
|
if len(recordSet.Records) > 0 {
|
||||||
|
|
@ -57,7 +56,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
data := v.GetData()
|
data := v.GetData()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.Logger.Printf("GetData ERROR: %v\n", err)
|
logger.Printf("GetData ERROR: %v\n", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +67,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
p.Logger.Printf("NextShardIterator ERROR: %v\n", err)
|
logger.Printf("NextShardIterator ERROR: %v\n", err)
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@ type RedshiftBasicEmitter struct {
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
Logger Logger
|
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
TableName string
|
TableName string
|
||||||
}
|
}
|
||||||
|
|
@ -31,16 +30,16 @@ func (e RedshiftBasicEmitter) Emit(b Buffer, t Transformer) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("sql.Open ERROR: %v\n", err)
|
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(e.copyStatement(s3File))
|
_, err = db.Exec(e.copyStatement(s3File))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Logger.Printf("Redshift load completed.\n")
|
logger.Printf("Redshift load completed.\n")
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ type RedshiftManifestEmitter struct {
|
||||||
FileTable string
|
FileTable string
|
||||||
Format string
|
Format string
|
||||||
Jsonpaths string
|
Jsonpaths string
|
||||||
Logger Logger
|
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
SecretKey string
|
SecretKey string
|
||||||
}
|
}
|
||||||
|
|
@ -35,7 +34,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("sql.Open ERROR: %v\n", err)
|
logger.Fatalf("sql.Open ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate file paths as strings
|
// Aggregate file paths as strings
|
||||||
|
|
@ -55,7 +54,7 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(c)
|
_, err = db.Exec(c)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert file paths into File Names table
|
// Insert file paths into File Names table
|
||||||
|
|
@ -63,10 +62,10 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err = db.Exec(i)
|
_, err = db.Exec(i)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("db.Exec ERROR: %v\n", err)
|
logger.Fatalf("db.Exec ERROR: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Logger.Printf("[%v] copied to Redshift", manifestFileName)
|
logger.Printf("[%v] copied to Redshift", manifestFileName)
|
||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ import (
|
||||||
// from the first and last sequence numbers of the records contained in that file separated by a
|
// from the first and last sequence numbers of the records contained in that file separated by a
|
||||||
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
// dash. This struct requires the configuration of an S3 bucket and endpoint.
|
||||||
type S3Emitter struct {
|
type S3Emitter struct {
|
||||||
Logger Logger
|
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,8 +43,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) {
|
||||||
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Fatalf("S3Put ERROR: %v\n", err.Error())
|
logger.Fatalf("S3Put ERROR: %v\n", err.Error())
|
||||||
} else {
|
} else {
|
||||||
e.Logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
logger.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ type S3ManifestEmitter struct {
|
||||||
OutputStream string
|
OutputStream string
|
||||||
S3Bucket string
|
S3Bucket string
|
||||||
Ksis *kinesis.Kinesis
|
Ksis *kinesis.Kinesis
|
||||||
Logger Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
|
|
@ -29,8 +28,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) {
|
||||||
_, err := e.Ksis.PutRecord(args)
|
_, err := e.Ksis.PutRecord(args)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.Logger.Printf("PutRecord ERROR: %v", err)
|
logger.Printf("PutRecord ERROR: %v", err)
|
||||||
} else {
|
} else {
|
||||||
e.Logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
logger.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue