From 02a9d5d173c331972d25de314e27a87b5f5572d4 Mon Sep 17 00:00:00 2001 From: dan Date: Fri, 3 Apr 2015 23:05:49 -0700 Subject: [PATCH] use log4go output --- pipeline.go | 6 +++--- redshift_basic_emitter.go | 8 +++++--- redshift_manifest_emitter.go | 14 +++++++++----- s3_emitter.go | 6 +++--- s3_manifest_emitter.go | 7 +++---- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pipeline.go b/pipeline.go index 791097b..1da2ad0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -1,7 +1,6 @@ package connector import ( - "log" "math" "os" "time" @@ -63,7 +62,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { shardInfo, err := ksis.GetShardIterator(args) if err != nil { - log.Fatalf("GetShardIterator ERROR: %v\n", err) + l4g.Critical("GetShardIterator ERROR: %v", err) + os.Exit(1) } shardIterator := shardInfo.ShardIterator @@ -108,7 +108,7 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { - log.Printf("NextShardIterator ERROR: %v\n", err) + l4g.Error("NextShardIterator ERROR: %v", err) break } else { time.Sleep(5 * time.Second) diff --git a/redshift_basic_emitter.go b/redshift_basic_emitter.go index d1d9b6e..8b17489 100644 --- a/redshift_basic_emitter.go +++ b/redshift_basic_emitter.go @@ -4,10 +4,10 @@ import ( "bytes" "database/sql" "fmt" - "log" "os" // Postgres package is used when sql.Open is called + l4g "github.com/ezoic/sol/log4go" _ "github.com/lib/pq" ) @@ -31,13 +31,15 @@ func (e RedshiftBasicEmtitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } _, err = db.Exec(e.copyStatement(s3File)) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } fmt.Printf("Redshift load completed.\n") diff --git a/redshift_manifest_emitter.go b/redshift_manifest_emitter.go index 36f0137..4780e0a 100644 --- a/redshift_manifest_emitter.go +++ b/redshift_manifest_emitter.go @@ -5,13 +5,13 @@ import ( "database/sql" "encoding/json" "fmt" - "log" "os" "strings" "time" "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/sol/log4go" _ "github.com/lib/pq" ) @@ -35,7 +35,9 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { db, err := sql.Open("postgres", os.Getenv("REDSHIFT_URL")) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) + } // Aggregate file paths as strings @@ -55,7 +57,8 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(c) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } // Insert file paths into File Names table @@ -63,10 +66,11 @@ func (e RedshiftManifestEmitter) Emit(b Buffer, t Transformer) { _, err = db.Exec(i) if err != nil { - log.Fatal(err) + l4g.Critical(err) + os.Exit(1) } - log.Printf("[%v] copied to Redshift", manifestFileName) + l4g.Info("[%v] copied to Redshift", manifestFileName) db.Close() } diff --git a/s3_emitter.go b/s3_emitter.go index 69a29c6..4b8339f 100644 --- a/s3_emitter.go +++ b/s3_emitter.go @@ -3,11 +3,11 @@ package connector import ( "bytes" "fmt" - "log" "time" "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" + l4g "github.com/ezoic/sol/log4go" ) // S3Emitter is an implementation of Emitter used to store files from a Kinesis stream in S3. @@ -44,8 +44,8 @@ func (e S3Emitter) Emit(b Buffer, t Transformer) { err := bucket.Put(s3File, buffer.Bytes(), "text/plain", s3.Private, s3.Options{}) if err != nil { - log.Printf("S3Put ERROR: %v\n", err) + l4g.Error("S3Put ERROR: %v", err) } else { - log.Printf("[%v] records emitted to [%s]\n", b.NumRecordsInBuffer(), e.S3Bucket) + l4g.Info("[%v] records emitted to [%s]", b.NumRecordsInBuffer(), e.S3Bucket) } } diff --git a/s3_manifest_emitter.go b/s3_manifest_emitter.go index 8918da8..63e71d0 100644 --- a/s3_manifest_emitter.go +++ b/s3_manifest_emitter.go @@ -1,9 +1,8 @@ package connector import ( - "log" - "github.com/ezoic/go-kinesis" + l4g "github.com/ezoic/sol/log4go" ) // An implementation of Emitter that puts event data on S3 file, and then puts the @@ -30,8 +29,8 @@ func (e S3ManifestEmitter) Emit(b Buffer, t Transformer) { _, err := e.Ksis.PutRecord(args) if err != nil { - log.Printf("PutRecord ERROR: %v", err) + l4g.Error("PutRecord ERROR: %v", err) } else { - log.Printf("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) + l4g.Info("[%s] emitted to [%s]", b.FirstSequenceNumber(), e.OutputStream) } }